Simplify write-cache (SUPPORT) #353

Merged
fyrchik merged 1 commit from fyrchik/frostfs-node:simplify-cache-support into support/v0.36 2023-07-26 21:07:58 +00:00
9 changed files with 22 additions and 392 deletions

View file

@ -70,7 +70,7 @@ func (c *cache) flushSmallObjects() {
m = m[:0]
c.modeMtx.RLock()

I am open to suggestions here.

I am open to suggestions here.

Removed this, make closing in 2 steps.

Removed this, make closing in 2 steps.
if c.readOnly() || !c.initialized.Load() {
if c.readOnly() {
c.modeMtx.RUnlock()
time.Sleep(time.Second)
continue
@ -109,10 +109,6 @@ func (c *cache) flushSmallObjects() {
var count int
for i := range m {
if c.flushed.Contains(m[i].addr) {
continue
}
obj := object.New()
if err := obj.Unmarshal(m[i].data); err != nil {
continue
@ -151,9 +147,6 @@ func (c *cache) workerFlushBig() {
if c.readOnly() {
c.modeMtx.RUnlock()
break
} else if !c.initialized.Load() {
c.modeMtx.RUnlock()
continue
}
_ = c.flushFSTree(true)
@ -181,10 +174,6 @@ func (c *cache) flushFSTree(ignoreErrors bool) error {
prm.LazyHandler = func(addr oid.Address, f func() ([]byte, error)) error {
sAddr := addr.EncodeToString()
if _, ok := c.store.flushed.Peek(sAddr); ok {
return nil
}
data, err := f()
if err != nil {
c.reportFlushError("can't read a file", sAddr, err)
@ -212,9 +201,7 @@ func (c *cache) flushFSTree(ignoreErrors bool) error {
return err
}
// mark object as flushed
c.flushed.Add(sAddr, false)
c.deleteFromDisk([]string{sAddr})
return nil
}
@ -236,9 +223,12 @@ func (c *cache) workerFlushSmall() {
}
err := c.flushObject(obj, nil)
if err == nil {
c.flushed.Add(objectCore.AddressOf(obj).EncodeToString(), true)
if err != nil {
// Error is handled in flushObject.
continue
}
c.deleteFromDB([]string{objectCore.AddressOf(obj).EncodeToString()})
}
}
@ -294,10 +284,6 @@ func (c *cache) flush(ignoreErrors bool) error {
cs := b.Cursor()
for k, data := cs.Seek(nil); k != nil; k, data = cs.Next() {
sa := string(k)
if _, ok := c.flushed.Peek(sa); ok {
continue
}
if err := addr.DecodeString(sa); err != nil {
c.reportFlushError("can't decode object address from the DB", sa, err)
if ignoreErrors {

View file

@ -5,7 +5,6 @@ import (
"os"
"path/filepath"
"testing"
"time"
objectCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor"
@ -15,7 +14,6 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
checksumtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/checksum/test"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
@ -109,22 +107,9 @@ func TestFlush(t *testing.T) {
require.NoError(t, bs.SetMode(mode.ReadWrite))
require.NoError(t, mb.SetMode(mode.ReadWrite))
wc.(*cache).flushed.Add(objects[0].addr.EncodeToString(), true)
wc.(*cache).flushed.Add(objects[1].addr.EncodeToString(), false)
require.NoError(t, wc.Flush(false))
for i := 0; i < 2; i++ {
var mPrm meta.GetPrm
mPrm.SetAddress(objects[i].addr)
_, err := mb.Get(mPrm)
require.Error(t, err)
_, err = bs.Get(context.Background(), common.GetPrm{Address: objects[i].addr})
require.Error(t, err)
}
check(t, mb, bs, objects[2:])
check(t, mb, bs, objects)
})
t.Run("flush on moving to degraded mode", func(t *testing.T) {
@ -138,23 +123,9 @@ func TestFlush(t *testing.T) {
require.NoError(t, wc.SetMode(mode.ReadOnly))
require.NoError(t, bs.SetMode(mode.ReadWrite))
require.NoError(t, mb.SetMode(mode.ReadWrite))
wc.(*cache).flushed.Add(objects[0].addr.EncodeToString(), true)
wc.(*cache).flushed.Add(objects[1].addr.EncodeToString(), false)
require.NoError(t, wc.SetMode(mode.Degraded))
for i := 0; i < 2; i++ {
var mPrm meta.GetPrm
mPrm.SetAddress(objects[i].addr)
_, err := mb.Get(mPrm)
require.Error(t, err)
_, err = bs.Get(context.Background(), common.GetPrm{Address: objects[i].addr})
require.Error(t, err)
}
check(t, mb, bs, objects[2:])
check(t, mb, bs, objects)
})
t.Run("ignore errors", func(t *testing.T) {
@ -223,67 +194,6 @@ func TestFlush(t *testing.T) {
})
})
})
t.Run("on init", func(t *testing.T) {
wc, bs, mb := newCache(t)
objects := []objectPair{
// removed
putObject(t, wc, 1),
putObject(t, wc, smallSize+1),
// not found
putObject(t, wc, 1),
putObject(t, wc, smallSize+1),
// ok
putObject(t, wc, 1),
putObject(t, wc, smallSize+1),
}
require.NoError(t, wc.Close())
require.NoError(t, bs.SetMode(mode.ReadWrite))
require.NoError(t, mb.SetMode(mode.ReadWrite))
for i := range objects {
var prm meta.PutPrm
prm.SetObject(objects[i].obj)
_, err := mb.Put(prm)
require.NoError(t, err)
}
var inhumePrm meta.InhumePrm
inhumePrm.SetAddresses(objects[0].addr, objects[1].addr)
inhumePrm.SetTombstoneAddress(oidtest.Address())
_, err := mb.Inhume(inhumePrm)
require.NoError(t, err)
var deletePrm meta.DeletePrm
deletePrm.SetAddresses(objects[2].addr, objects[3].addr)
_, err = mb.Delete(deletePrm)
require.NoError(t, err)
require.NoError(t, bs.SetMode(mode.ReadOnly))
require.NoError(t, mb.SetMode(mode.ReadOnly))
// Open in read-only: no error, nothing is removed.
require.NoError(t, wc.Open(true))
initWC(t, wc)
for i := range objects {
_, err := wc.Get(context.Background(), objects[i].addr)
require.NoError(t, err, i)
}
require.NoError(t, wc.Close())
// Open in read-write: no error, something is removed.
require.NoError(t, wc.Open(false))
initWC(t, wc)
for i := range objects {
_, err := wc.Get(context.Background(), objects[i].addr)
if i < 2 {
require.ErrorAs(t, err, new(apistatus.ObjectNotFound), i)
} else {
require.NoError(t, err, i)
}
}
})
}
func putObject(t *testing.T, c Cache, size int) objectPair {
@ -321,11 +231,6 @@ func newObject(t *testing.T, size int) (*object.Object, []byte) {
func initWC(t *testing.T, wc Cache) {
require.NoError(t, wc.Init())
require.Eventually(t, func() bool {
rawWc := wc.(*cache)
return rawWc.initialized.Load()
}, 100*time.Second, 1*time.Millisecond)
}
type dummyEpoch struct{}

View file

@ -30,7 +30,6 @@ func (c *cache) Get(ctx context.Context, addr oid.Address) (*objectSDK.Object, e
value, err := Get(c.db, []byte(saddr))
if err == nil {
obj := objectSDK.New()
c.flushed.Get(saddr)
return obj, obj.Unmarshal(value)
}
@ -39,7 +38,6 @@ func (c *cache) Get(ctx context.Context, addr oid.Address) (*objectSDK.Object, e
return nil, logicerr.Wrap(apistatus.ObjectNotFound{})
}
c.flushed.Get(saddr)
return res.Object, nil
}

View file

@ -1,183 +0,0 @@
package writecache
import (
"context"
"errors"
"sync"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
storagelog "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/log"
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"go.etcd.io/bbolt"
"go.uber.org/zap"
)
func (c *cache) initFlushMarks() {
var localWG sync.WaitGroup
localWG.Add(1)
go func() {
defer localWG.Done()
c.fsTreeFlushMarkUpdate()
}()
localWG.Add(1)
go func() {
defer localWG.Done()
c.dbFlushMarkUpdate()
}()
c.initWG.Add(1)
c.wg.Add(1)
go func() {
defer c.wg.Done()
defer c.initWG.Done()
localWG.Wait()
select {
case <-c.stopInitCh:
return
case <-c.closeCh:
return
default:
}
c.initialized.Store(true)
}()
}
var errStopIter = errors.New("stop iteration")
func (c *cache) fsTreeFlushMarkUpdate() {
c.log.Info("filling flush marks for objects in FSTree")
var prm common.IteratePrm
prm.LazyHandler = func(addr oid.Address, _ func() ([]byte, error)) error {
select {
case <-c.closeCh:
return errStopIter
case <-c.stopInitCh:
return errStopIter
default:
}
flushed, needRemove := c.flushStatus(addr)
if flushed {
c.store.flushed.Add(addr.EncodeToString(), true)
if needRemove {
var prm common.DeletePrm
prm.Address = addr
_, err := c.fsTree.Delete(prm)
if err == nil {
storagelog.Write(c.log,
storagelog.AddressField(addr),
storagelog.StorageTypeField(wcStorageType),
storagelog.OpField("fstree DELETE"),
)
}
}
}
return nil
}
_, _ = c.fsTree.Iterate(prm)
c.log.Info("finished updating FSTree flush marks")
}
func (c *cache) dbFlushMarkUpdate() {
c.log.Info("filling flush marks for objects in database")
var m []string
var indices []int
var lastKey []byte
var batchSize = flushBatchSize
for {
select {
case <-c.closeCh:
return
case <-c.stopInitCh:
return
default:
}
m = m[:0]
indices = indices[:0]
// We put objects in batches of fixed size to not interfere with main put cycle a lot.
_ = c.db.View(func(tx *bbolt.Tx) error {
b := tx.Bucket(defaultBucket)
cs := b.Cursor()
for k, _ := cs.Seek(lastKey); k != nil && len(m) < batchSize; k, _ = cs.Next() {
m = append(m, string(k))
}
return nil
})
var addr oid.Address
for i := range m {
if err := addr.DecodeString(m[i]); err != nil {
continue
}
flushed, needRemove := c.flushStatus(addr)
if flushed {
c.store.flushed.Add(addr.EncodeToString(), true)
if needRemove {
indices = append(indices, i)
}
}
}
if len(m) == 0 {
break
}
err := c.db.Batch(func(tx *bbolt.Tx) error {
b := tx.Bucket(defaultBucket)
for _, j := range indices {
if err := b.Delete([]byte(m[j])); err != nil {
return err
}
}
return nil
})
if err == nil {
for _, j := range indices {
storagelog.Write(c.log,
zap.String("address", m[j]),
storagelog.StorageTypeField(wcStorageType),
storagelog.OpField("db DELETE"),
)
}
}
lastKey = append([]byte(m[len(m)-1]), 0)
}
c.log.Info("finished updating flush marks")
}
// flushStatus returns info about the object state in the main storage.
// First return value is true iff object exists.
// Second return value is true iff object can be safely removed.
func (c *cache) flushStatus(addr oid.Address) (bool, bool) {
var existsPrm meta.ExistsPrm
existsPrm.SetAddress(addr)
_, err := c.metabase.Exists(existsPrm)
if err != nil {
needRemove := errors.Is(err, meta.ErrObjectIsExpired) || errors.As(err, new(apistatus.ObjectAlreadyRemoved))
return needRemove, needRemove
}
var prm meta.StorageIDPrm
prm.SetAddress(addr)
mRes, _ := c.metabase.StorageID(prm)
res, err := c.blobstor.Exists(context.TODO(), common.ExistsPrm{Address: addr, StorageID: mRes.StorageID()})
return err == nil && res.Exists, false
}

View file

@ -41,9 +41,6 @@ func (c *cache) Iterate(prm IterationPrm) error {
err := c.db.View(func(tx *bbolt.Tx) error {
b := tx.Bucket(defaultBucket)
return b.ForEach(func(k, data []byte) error {
if _, ok := c.flushed.Peek(string(k)); ok {
return nil
}
return prm.handler(data)
})
})
@ -54,9 +51,6 @@ func (c *cache) Iterate(prm IterationPrm) error {
var fsPrm common.IteratePrm
fsPrm.IgnoreErrors = prm.ignoreErrors
fsPrm.LazyHandler = func(addr oid.Address, f func() ([]byte, error)) error {
if _, ok := c.flushed.Peek(addr.EncodeToString()); ok {
return nil
}
data, err := f()
if err != nil {
if prm.ignoreErrors {

View file

@ -36,19 +36,6 @@ func (c *cache) setMode(m mode.Mode) error {
}
}
if !c.initialized.Load() {
close(c.stopInitCh)
c.initWG.Wait()
c.stopInitCh = make(chan struct{})
defer func() {
if err == nil && !turnOffMeta {
c.initFlushMarks()
}
}()
}
if c.db != nil {
if err = c.db.Close(); err != nil {
return fmt.Errorf("can't close write-cache database: %w", err)

View file

@ -26,8 +26,6 @@ func (c *cache) Put(prm common.PutPrm) (common.PutRes, error) {
defer c.modeMtx.RUnlock()
if c.readOnly() {
return common.PutRes{}, ErrReadOnly
} else if !c.initialized.Load() {
return common.PutRes{}, ErrNotInitialized
}
sz := uint64(len(prm.RawData))

View file

@ -11,8 +11,6 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
lru "github.com/hashicorp/golang-lru/v2"
"github.com/hashicorp/golang-lru/v2/simplelru"
"go.etcd.io/bbolt"
"go.uber.org/zap"
)
@ -20,19 +18,7 @@ import (
// store represents persistent storage with in-memory LRU cache
// for flushed items on top of it.
type store struct {
maxFlushedMarksCount int
maxRemoveBatchSize int
// flushed contains addresses of objects that were already flushed to the main storage.
// We use LRU cache instead of map here to facilitate removing of unused object in favour of
// frequently read ones.
// MUST NOT be used inside bolt db transaction because it's eviction handler
// removes untracked items from the database.
flushed simplelru.LRUCache[string, bool]
db *bbolt.DB
dbKeysToRemove []string
fsKeysToRemove []string
}
const dbName = "small.bolt"
@ -71,35 +57,9 @@ func (c *cache) openStore(readOnly bool) error {
return fmt.Errorf("could not open FSTree: %w", err)
}
// Write-cache can be opened multiple times during `SetMode`.
// flushed map must not be re-created in this case.
if c.flushed == nil {
c.flushed, _ = lru.NewWithEvict[string, bool](c.maxFlushedMarksCount, c.removeFlushed)
}
c.initialized.Store(false)
return nil
}
// removeFlushed removes an object from the writecache.
// To minimize interference with the client operations, the actual removal
// is done in batches.
// It is not thread-safe and is used only as an evict callback to LRU cache.
func (c *cache) removeFlushed(key string, value bool) {
fromDatabase := value
if fromDatabase {
c.dbKeysToRemove = append(c.dbKeysToRemove, key)
} else {
c.fsKeysToRemove = append(c.fsKeysToRemove, key)
}
if len(c.dbKeysToRemove)+len(c.fsKeysToRemove) >= c.maxRemoveBatchSize {
c.dbKeysToRemove = c.deleteFromDB(c.dbKeysToRemove)
c.fsKeysToRemove = c.deleteFromDisk(c.fsKeysToRemove)
}
}
func (c *cache) deleteFromDB(keys []string) []string {
if len(keys) == 0 {
return keys

View file

@ -12,7 +12,6 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"go.etcd.io/bbolt"
"go.uber.org/atomic"
"go.uber.org/zap"
)
@ -52,9 +51,6 @@ type cache struct {
mtx sync.RWMutex
mode mode.Mode
initialized atomic.Bool
stopInitCh chan struct{} // used to sync initWG initialisation routines and _only_ them
initWG sync.WaitGroup // for initialisation routines only
modeMtx sync.RWMutex
// compressFlags maps address of a big object to boolean value indicating
@ -97,7 +93,6 @@ func New(opts ...Option) Cache {
c := &cache{
flushCh: make(chan *object.Object),
mode: mode.ReadWrite,
stopInitCh: make(chan struct{}),
compressFlags: make(map[string]struct{}),
options: options{
@ -116,12 +111,6 @@ func New(opts ...Option) Cache {
opts[i](&c.options)
}
// Make the LRU cache contain which take approximately 3/4 of the maximum space.
// Assume small and big objects are stored in 50-50 proportion.
c.maxFlushedMarksCount = int(c.maxCacheSize/c.maxObjectSize+c.maxCacheSize/c.smallObjectSize) / 2 * 3 / 4
// Trigger the removal when the cache is 7/8 full, so that new items can still arrive.
c.maxRemoveBatchSize = c.maxFlushedMarksCount / 8
return c
}
@ -152,31 +141,27 @@ func (c *cache) Open(readOnly bool) error {
// Init runs necessary services.
func (c *cache) Init() error {
c.initFlushMarks()
c.runFlushLoop()
return nil
}
// Close closes db connection and stops services. Executes ObjectCounters.FlushAndClose op.
func (c *cache) Close() error {
// We cannot lock mutex for the whole operation duration
// because it is taken by some background workers, so `wg.Wait()` is done without modeMtx.
c.modeMtx.Lock()
defer c.modeMtx.Unlock()
// Finish all in-progress operations.
if err := c.setMode(mode.ReadOnly); err != nil {
return err
}
if c.closeCh != nil {
close(c.closeCh)
}
c.mode = mode.DegradedReadOnly // prevent new operations from being processed
c.modeMtx.Unlock()
c.wg.Wait()
ale64bit marked this conversation as resolved Outdated

are these supposed to be committed?

are these supposed to be committed?

fixed

fixed
if c.closeCh != nil {
c.modeMtx.Lock()
defer c.modeMtx.Unlock()
c.closeCh = nil
}
c.initialized.Store(false)
var err error
if c.db != nil {
err = c.db.Close()