[#314] writecache: remove objects right after they are flushed

Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
This commit is contained in:
Evgenii Stratonikov 2023-05-10 17:43:49 +03:00 committed by Evgenii Stratonikov
parent bf79d06f03
commit 35c9b6b26d
9 changed files with 25 additions and 408 deletions

View file

@ -78,7 +78,7 @@ func (c *cache) flushSmallObjects() {
m = m[:0]
c.modeMtx.RLock()
if c.readOnly() || !c.initialized.Load() {
if c.readOnly() {
c.modeMtx.RUnlock()
time.Sleep(time.Second)
continue
@ -117,10 +117,6 @@ func (c *cache) flushSmallObjects() {
var count int
for i := range m {
if c.flushed.Contains(m[i].addr) {
continue
}
obj := object.New()
if err := obj.Unmarshal(m[i].data); err != nil {
continue
@ -157,9 +153,6 @@ func (c *cache) workerFlushBig(ctx context.Context) {
if c.readOnly() {
c.modeMtx.RUnlock()
break
} else if !c.initialized.Load() {
c.modeMtx.RUnlock()
continue
}
_ = c.flushFSTree(ctx, true)
@ -187,10 +180,6 @@ func (c *cache) flushFSTree(ctx context.Context, ignoreErrors bool) error {
prm.LazyHandler = func(addr oid.Address, f func() ([]byte, error)) error {
sAddr := addr.EncodeToString()
if _, ok := c.store.flushed.Peek(sAddr); ok {
return nil
}
data, err := f()
if err != nil {
c.reportFlushError("can't read a file", sAddr, err)
@ -218,9 +207,7 @@ func (c *cache) flushFSTree(ctx context.Context, ignoreErrors bool) error {
return err
}
// mark object as flushed
c.flushed.Add(sAddr, false)
c.deleteFromDisk(ctx, []string{sAddr})
return nil
}
@ -242,9 +229,12 @@ func (c *cache) workerFlushSmall() {
}
err := c.flushObject(context.TODO(), obj, nil)
if err == nil {
c.flushed.Add(objectCore.AddressOf(obj).EncodeToString(), true)
if err != nil {
// Error is handled in flushObject.
continue
}
c.deleteFromDB([]string{objectCore.AddressOf(obj).EncodeToString()})
}
}
@ -306,10 +296,6 @@ func (c *cache) flush(ctx context.Context, ignoreErrors bool) error {
cs := b.Cursor()
for k, data := cs.Seek(nil); k != nil; k, data = cs.Next() {
sa := string(k)
if _, ok := c.flushed.Peek(sa); ok {
continue
}
if err := addr.DecodeString(sa); err != nil {
c.reportFlushError("can't decode object address from the DB", sa, err)
if ignoreErrors {

View file

@ -5,7 +5,6 @@ import (
"os"
"path/filepath"
"testing"
"time"
objectCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor"
@ -15,7 +14,6 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
checksumtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/checksum/test"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
@ -109,22 +107,9 @@ func TestFlush(t *testing.T) {
require.NoError(t, bs.SetMode(mode.ReadWrite))
require.NoError(t, mb.SetMode(mode.ReadWrite))
wc.(*cache).flushed.Add(objects[0].addr.EncodeToString(), true)
wc.(*cache).flushed.Add(objects[1].addr.EncodeToString(), false)
require.NoError(t, wc.Flush(context.Background(), false))
for i := 0; i < 2; i++ {
var mPrm meta.GetPrm
mPrm.SetAddress(objects[i].addr)
_, err := mb.Get(context.Background(), mPrm)
require.Error(t, err)
_, err = bs.Get(context.Background(), common.GetPrm{Address: objects[i].addr})
require.Error(t, err)
}
check(t, mb, bs, objects[2:])
check(t, mb, bs, objects)
})
t.Run("flush on moving to degraded mode", func(t *testing.T) {
@ -138,23 +123,9 @@ func TestFlush(t *testing.T) {
require.NoError(t, wc.SetMode(mode.ReadOnly))
require.NoError(t, bs.SetMode(mode.ReadWrite))
require.NoError(t, mb.SetMode(mode.ReadWrite))
wc.(*cache).flushed.Add(objects[0].addr.EncodeToString(), true)
wc.(*cache).flushed.Add(objects[1].addr.EncodeToString(), false)
require.NoError(t, wc.SetMode(mode.Degraded))
for i := 0; i < 2; i++ {
var mPrm meta.GetPrm
mPrm.SetAddress(objects[i].addr)
_, err := mb.Get(context.Background(), mPrm)
require.Error(t, err)
_, err = bs.Get(context.Background(), common.GetPrm{Address: objects[i].addr})
require.Error(t, err)
}
check(t, mb, bs, objects[2:])
check(t, mb, bs, objects)
})
t.Run("ignore errors", func(t *testing.T) {
@ -223,67 +194,6 @@ func TestFlush(t *testing.T) {
})
})
})
t.Run("on init", func(t *testing.T) {
wc, bs, mb := newCache(t)
objects := []objectPair{
// removed
putObject(t, wc, 1),
putObject(t, wc, smallSize+1),
// not found
putObject(t, wc, 1),
putObject(t, wc, smallSize+1),
// ok
putObject(t, wc, 1),
putObject(t, wc, smallSize+1),
}
require.NoError(t, wc.Close())
require.NoError(t, bs.SetMode(mode.ReadWrite))
require.NoError(t, mb.SetMode(mode.ReadWrite))
for i := range objects {
var prm meta.PutPrm
prm.SetObject(objects[i].obj)
_, err := mb.Put(context.Background(), prm)
require.NoError(t, err)
}
var inhumePrm meta.InhumePrm
inhumePrm.SetAddresses(objects[0].addr, objects[1].addr)
inhumePrm.SetTombstoneAddress(oidtest.Address())
_, err := mb.Inhume(context.Background(), inhumePrm)
require.NoError(t, err)
var deletePrm meta.DeletePrm
deletePrm.SetAddresses(objects[2].addr, objects[3].addr)
_, err = mb.Delete(context.Background(), deletePrm)
require.NoError(t, err)
require.NoError(t, bs.SetMode(mode.ReadOnly))
require.NoError(t, mb.SetMode(mode.ReadOnly))
// Open in read-only: no error, nothing is removed.
require.NoError(t, wc.Open(true))
initWC(t, wc)
for i := range objects {
_, err := wc.Get(context.Background(), objects[i].addr)
require.NoError(t, err, i)
}
require.NoError(t, wc.Close())
// Open in read-write: no error, something is removed.
require.NoError(t, wc.Open(false))
initWC(t, wc)
for i := range objects {
_, err := wc.Get(context.Background(), objects[i].addr)
if i < 2 {
require.ErrorAs(t, err, new(apistatus.ObjectNotFound), i)
} else {
require.NoError(t, err, i)
}
}
})
}
func putObject(t *testing.T, c Cache, size int) objectPair {
@ -321,11 +231,6 @@ func newObject(t *testing.T, size int) (*object.Object, []byte) {
func initWC(t *testing.T, wc Cache) {
require.NoError(t, wc.Init())
require.Eventually(t, func() bool {
rawWc := wc.(*cache)
return rawWc.initialized.Load()
}, 100*time.Second, 1*time.Millisecond)
}
type dummyEpoch struct{}

View file

@ -30,7 +30,6 @@ func (c *cache) Get(ctx context.Context, addr oid.Address) (*objectSDK.Object, e
value, err := Get(c.db, []byte(saddr))
if err == nil {
obj := objectSDK.New()
c.flushed.Get(saddr)
return obj, obj.Unmarshal(value)
}
@ -39,7 +38,6 @@ func (c *cache) Get(ctx context.Context, addr oid.Address) (*objectSDK.Object, e
return nil, logicerr.Wrap(apistatus.ObjectNotFound{})
}
c.flushed.Get(saddr)
return res.Object, nil
}

View file

@ -1,192 +0,0 @@
package writecache
import (
"context"
"errors"
"sync"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
storagelog "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/log"
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"go.etcd.io/bbolt"
"go.uber.org/zap"
)
func (c *cache) initFlushMarks(ctx context.Context) {
var localWG sync.WaitGroup
localWG.Add(1)
go func() {
defer localWG.Done()
c.fsTreeFlushMarkUpdate(ctx)
}()
localWG.Add(1)
go func() {
defer localWG.Done()
c.dbFlushMarkUpdate(ctx)
}()
c.initWG.Add(1)
c.wg.Add(1)
go func() {
defer c.wg.Done()
defer c.initWG.Done()
localWG.Wait()
select {
case <-c.stopInitCh:
return
case <-c.closeCh:
return
default:
}
c.initialized.Store(true)
}()
}
var errStopIter = errors.New("stop iteration")
func (c *cache) fsTreeFlushMarkUpdate(ctx context.Context) {
c.log.Info(logs.WritecacheFillingFlushMarksForObjectsInFSTree)
var prm common.IteratePrm
prm.LazyHandler = func(addr oid.Address, _ func() ([]byte, error)) error {
select {
case <-c.closeCh:
return errStopIter
case <-c.stopInitCh:
return errStopIter
default:
}
flushed, needRemove := c.flushStatus(ctx, addr)
if flushed {
c.store.flushed.Add(addr.EncodeToString(), true)
if needRemove {
var prm common.DeletePrm
prm.Address = addr
_, err := c.fsTree.Delete(ctx, prm)
if err == nil {
storagelog.Write(c.log,
storagelog.AddressField(addr),
storagelog.StorageTypeField(wcStorageType),
storagelog.OpField("fstree DELETE"),
)
}
}
}
return nil
}
c.modeMtx.RLock()
defer c.modeMtx.RUnlock()
_, _ = c.fsTree.Iterate(prm)
c.log.Info(logs.WritecacheFinishedUpdatingFSTreeFlushMarks)
}
func (c *cache) dbFlushMarkUpdate(ctx context.Context) {
c.log.Info(logs.WritecacheFillingFlushMarksForObjectsInDatabase)
c.modeMtx.RLock()
defer c.modeMtx.RUnlock()
var m []string
var indices []int
var lastKey []byte
var batchSize = flushBatchSize
for {
select {
case <-c.closeCh:
return
case <-c.stopInitCh:
return
default:
}
m = m[:0]
indices = indices[:0]
// We put objects in batches of fixed size to not interfere with main put cycle a lot.
_ = c.db.View(func(tx *bbolt.Tx) error {
b := tx.Bucket(defaultBucket)
cs := b.Cursor()
for k, _ := cs.Seek(lastKey); k != nil && len(m) < batchSize; k, _ = cs.Next() {
m = append(m, string(k))
}
return nil
})
var addr oid.Address
for i := range m {
if err := addr.DecodeString(m[i]); err != nil {
continue
}
flushed, needRemove := c.flushStatus(ctx, addr)
if flushed {
c.store.flushed.Add(addr.EncodeToString(), true)
if needRemove {
indices = append(indices, i)
}
}
}
if len(m) == 0 {
break
}
err := c.db.Batch(func(tx *bbolt.Tx) error {
b := tx.Bucket(defaultBucket)
for _, j := range indices {
if err := b.Delete([]byte(m[j])); err != nil {
return err
}
}
return nil
})
if err == nil {
for _, j := range indices {
storagelog.Write(c.log,
zap.String("address", m[j]),
storagelog.StorageTypeField(wcStorageType),
storagelog.OpField("db DELETE"),
)
}
}
lastKey = append([]byte(m[len(m)-1]), 0)
}
c.log.Info(logs.WritecacheFinishedUpdatingFlushMarks)
}
// flushStatus returns info about the object state in the main storage.
// First return value is true iff object exists.
// Second return value is true iff object can be safely removed.
func (c *cache) flushStatus(ctx context.Context, addr oid.Address) (bool, bool) {
var existsPrm meta.ExistsPrm
existsPrm.SetAddress(addr)
_, err := c.metabase.Exists(ctx, existsPrm)
if err != nil {
needRemove := errors.Is(err, meta.ErrObjectIsExpired) || errors.As(err, new(apistatus.ObjectAlreadyRemoved))
return needRemove, needRemove
}
var prm meta.StorageIDPrm
prm.SetAddress(addr)
mRes, _ := c.metabase.StorageID(ctx, prm)
res, err := c.blobstor.Exists(ctx, common.ExistsPrm{Address: addr, StorageID: mRes.StorageID()})
return err == nil && res.Exists, false
}

View file

@ -41,9 +41,6 @@ func (c *cache) Iterate(prm IterationPrm) error {
err := c.db.View(func(tx *bbolt.Tx) error {
b := tx.Bucket(defaultBucket)
return b.ForEach(func(k, data []byte) error {
if _, ok := c.flushed.Peek(string(k)); ok {
return nil
}
return prm.handler(data)
})
})
@ -54,9 +51,6 @@ func (c *cache) Iterate(prm IterationPrm) error {
var fsPrm common.IteratePrm
fsPrm.IgnoreErrors = prm.ignoreErrors
fsPrm.LazyHandler = func(addr oid.Address, f func() ([]byte, error)) error {
if _, ok := c.flushed.Peek(addr.EncodeToString()); ok {
return nil
}
data, err := f()
if err != nil {
if prm.ignoreErrors {

View file

@ -37,22 +37,6 @@ func (c *cache) setMode(ctx context.Context, m mode.Mode) error {
var err error
turnOffMeta := m.NoMetabase()
if !c.initialized.Load() {
close(c.stopInitCh)
c.initWG.Wait()
c.stopInitCh = make(chan struct{})
defer func() {
if err == nil && !turnOffMeta {
c.initFlushMarks(ctx)
}
}()
}
c.modeMtx.Lock()
defer c.modeMtx.Unlock()
if turnOffMeta && !c.mode.NoMetabase() {
err = c.flush(ctx, true)
if err != nil {

View file

@ -37,8 +37,6 @@ func (c *cache) Put(ctx context.Context, prm common.PutPrm) (common.PutRes, erro
defer c.modeMtx.RUnlock()
if c.readOnly() {
return common.PutRes{}, ErrReadOnly
} else if !c.initialized.Load() {
return common.PutRes{}, ErrNotInitialized
}
sz := uint64(len(prm.RawData))

View file

@ -13,8 +13,6 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
lru "github.com/hashicorp/golang-lru/v2"
"github.com/hashicorp/golang-lru/v2/simplelru"
"go.etcd.io/bbolt"
"go.uber.org/zap"
)
@ -22,19 +20,7 @@ import (
// store represents persistent storage with in-memory LRU cache
// for flushed items on top of it.
type store struct {
maxFlushedMarksCount int
maxRemoveBatchSize int
// flushed contains addresses of objects that were already flushed to the main storage.
// We use LRU cache instead of map here to facilitate removing of unused object in favour of
// frequently read ones.
// MUST NOT be used inside bolt db transaction because it's eviction handler
// removes untracked items from the database.
flushed simplelru.LRUCache[string, bool]
db *bbolt.DB
dbKeysToRemove []string
fsKeysToRemove []string
db *bbolt.DB
}
const dbName = "small.bolt"
@ -73,35 +59,9 @@ func (c *cache) openStore(readOnly bool) error {
return fmt.Errorf("could not open FSTree: %w", err)
}
// Write-cache can be opened multiple times during `SetMode`.
// flushed map must not be re-created in this case.
if c.flushed == nil {
c.flushed, _ = lru.NewWithEvict[string, bool](c.maxFlushedMarksCount, c.removeFlushed)
}
c.initialized.Store(false)
return nil
}
// removeFlushed removes an object from the writecache.
// To minimize interference with the client operations, the actual removal
// is done in batches.
// It is not thread-safe and is used only as an evict callback to LRU cache.
func (c *cache) removeFlushed(key string, value bool) {
fromDatabase := value
if fromDatabase {
c.dbKeysToRemove = append(c.dbKeysToRemove, key)
} else {
c.fsKeysToRemove = append(c.fsKeysToRemove, key)
}
if len(c.dbKeysToRemove)+len(c.fsKeysToRemove) >= c.maxRemoveBatchSize {
c.dbKeysToRemove = c.deleteFromDB(c.dbKeysToRemove)
c.fsKeysToRemove = c.deleteFromDisk(c.fsKeysToRemove)
}
}
func (c *cache) deleteFromDB(keys []string) []string {
if len(keys) == 0 {
return keys
@ -133,7 +93,7 @@ func (c *cache) deleteFromDB(keys []string) []string {
return keys[:len(keys)-errorIndex]
}
func (c *cache) deleteFromDisk(keys []string) []string {
func (c *cache) deleteFromDisk(ctx context.Context, keys []string) []string {
if len(keys) == 0 {
return keys
}
@ -147,7 +107,7 @@ func (c *cache) deleteFromDisk(keys []string) []string {
continue
}
_, err := c.fsTree.Delete(context.TODO(), common.DeletePrm{Address: addr})
_, err := c.fsTree.Delete(ctx, common.DeletePrm{Address: addr})
if err != nil && !errors.As(err, new(apistatus.ObjectNotFound)) {
c.log.Error(logs.WritecacheCantRemoveObjectFromWritecache, zap.Error(err))

View file

@ -5,7 +5,6 @@ import (
"os"
"sync"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/pkg/tracing"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/fstree"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
@ -13,7 +12,6 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"go.etcd.io/bbolt"
"go.uber.org/atomic"
"go.uber.org/zap"
)
@ -52,11 +50,8 @@ type cache struct {
// mtx protects statistics, counters and compressFlags.
mtx sync.RWMutex
mode mode.Mode
initialized atomic.Bool
stopInitCh chan struct{} // used to sync initWG initialisation routines and _only_ them
initWG sync.WaitGroup // for initialisation routines only
modeMtx sync.RWMutex
mode mode.Mode
modeMtx sync.RWMutex
// compressFlags maps address of a big object to boolean value indicating
// whether object should be compressed.
@ -96,9 +91,8 @@ var (
// New creates new writecache instance.
func New(opts ...Option) Cache {
c := &cache{
flushCh: make(chan *object.Object),
mode: mode.ReadWrite,
stopInitCh: make(chan struct{}),
flushCh: make(chan *object.Object),
mode: mode.ReadWrite,
compressFlags: make(map[string]struct{}),
options: options{
@ -117,12 +111,6 @@ func New(opts ...Option) Cache {
opts[i](&c.options)
}
// Make the LRU cache contain which take approximately 3/4 of the maximum space.
// Assume small and big objects are stored in 50-50 proportion.
c.maxFlushedMarksCount = int(c.maxCacheSize/c.maxObjectSize+c.maxCacheSize/c.smallObjectSize) / 2 * 3 / 4
// Trigger the removal when the cache is 7/8 full, so that new items can still arrive.
c.maxRemoveBatchSize = c.maxFlushedMarksCount / 8
return c
}
@ -153,31 +141,27 @@ func (c *cache) Open(readOnly bool) error {
// Init runs necessary services.
func (c *cache) Init() error {
ctx, span := tracing.StartSpanFromContext(context.TODO(), "writecache.Init")
defer span.End()
c.initFlushMarks(ctx)
c.runFlushLoop()
return nil
}
// Close closes db connection and stops services. Executes ObjectCounters.FlushAndClose op.
func (c *cache) Close() error {
// Finish all in-progress operations.
if err := c.setMode(context.TODO(), mode.ReadOnly); err != nil {
return err
}
// We cannot lock mutex for the whole operation duration
// because it is taken by some background workers, so `wg.Wait()` is done without modeMtx.
c.modeMtx.Lock()
if c.closeCh != nil {
close(c.closeCh)
}
c.mode = mode.DegradedReadOnly // prevent new operations from being processed
c.modeMtx.Unlock()
c.wg.Wait()
if c.closeCh != nil {
c.closeCh = nil
}
c.initialized.Store(false)
c.modeMtx.Lock()
defer c.modeMtx.Unlock()
c.closeCh = nil
var err error
if c.db != nil {
err = c.db.Close()