WIP: Simplify write-cache #314
13 changed files with 436 additions and 521 deletions
|
@ -57,3 +57,7 @@ func (m Mode) NoMetabase() bool {
|
|||
func (m Mode) ReadOnly() bool {
|
||||
return m&ReadOnly != 0
|
||||
}
|
||||
|
||||
func (m Mode) ReadWrite() bool {
|
||||
return m == 0
|
||||
}
|
||||
|
|
|
@ -2,10 +2,12 @@ package writecache
|
|||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/pkg/tracing"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
||||
storagelog "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/log"
|
||||
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
"go.etcd.io/bbolt"
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
|
@ -31,14 +33,14 @@ func (c *Cache) Delete(ctx context.Context, addr oid.Address) error {
|
|||
saddr := addr.EncodeToString()
|
||||
|
||||
// Check disk cache.
|
||||
var has int
|
||||
var valLen int
|
||||
_ = c.db.View(func(tx *bbolt.Tx) error {
|
||||
b := tx.Bucket(defaultBucket)
|
||||
has = len(b.Get([]byte(saddr)))
|
||||
valLen = len(b.Get([]byte(saddr)))
|
||||
return nil
|
||||
})
|
||||
|
||||
if 0 < has {
|
||||
if valLen > 0 {
|
||||
err := c.db.Update(func(tx *bbolt.Tx) error {
|
||||
b := tx.Bucket(defaultBucket)
|
||||
err := b.Delete([]byte(saddr))
|
||||
|
@ -52,18 +54,32 @@ func (c *Cache) Delete(ctx context.Context, addr oid.Address) error {
|
|||
storagelog.StorageTypeField(wcStorageType),
|
||||
storagelog.OpField("db DELETE"),
|
||||
)
|
||||
c.objCounters.DecDB()
|
||||
c.objCounters.decDB(valLen)
|
||||
return nil
|
||||
}
|
||||
|
||||
_, err := c.fsTree.Delete(ctx, common.DeletePrm{Address: addr})
|
||||
// While getting an object looks overheadly, it allows to
|
||||
// get its size correctly without any additional memory/disk/CPU
|
||||
// usage on the WC's side _for every object_. `Delete` is not
|
||||
// expected to be called right after an object is put to the
|
||||
// Write-cache often, and for non-existing objects (persisted
|
||||
// to the main storage and dropped from the WC's storage) it
|
||||
// is `os.Stat` vs `os.Remove` calls after all.
|
||||
res, err := c.fsTree.Get(ctx, common.GetPrm{Address: addr})
|
||||
if errors.As(err, new(apistatus.ObjectNotFound)) {
|
||||
return nil
|
||||
} else if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
_, err = c.fsTree.Delete(ctx, common.DeletePrm{Address: addr})
|
||||
if err == nil {
|
||||
storagelog.Write(c.log,
|
||||
storagelog.AddressField(saddr),
|
||||
storagelog.StorageTypeField(wcStorageType),
|
||||
storagelog.OpField("fstree DELETE"),
|
||||
)
|
||||
c.objCounters.DecFS()
|
||||
c.objCounters.decFS(len(res.RawData))
|
||||
}
|
||||
|
||||
return err
|
||||
|
|
|
@ -11,7 +11,9 @@ import (
|
|||
objectCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
||||
storagelog "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/log"
|
||||
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
|
||||
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
"github.com/mr-tron/base58"
|
||||
|
@ -33,6 +35,11 @@ const (
|
|||
defaultFlushInterval = time.Second
|
||||
)
|
||||
|
||||
type objWithData struct {
|
||||
obj *object.Object
|
||||
data []byte
|
||||
}
|
||||
|
||||
// runFlushLoop starts background workers which periodically flush objects to the blobstor.
|
||||
func (c *Cache) runFlushLoop() {
|
||||
for i := 0; i < c.workersCount; i++ {
|
||||
|
@ -58,7 +65,7 @@ func (c *Cache) runFlushLoop() {
|
|||
case <-tt.C:
|
||||
c.flushSmallObjects()
|
||||
tt.Reset(defaultFlushInterval)
|
||||
case <-c.closeCh:
|
||||
case <-c.workersChan:
|
||||
return
|
||||
}
|
||||
}
|
||||
|
@ -70,20 +77,13 @@ func (c *Cache) flushSmallObjects() {
|
|||
var m []objectInfo
|
||||
for {
|
||||
select {
|
||||
case <-c.closeCh:
|
||||
case <-c.workersChan:
|
||||
return
|
||||
default:
|
||||
}
|
||||
|
||||
m = m[:0]
|
||||
|
||||
c.modeMtx.RLock()
|
||||
if c.readOnly() || !c.initialized.Load() {
|
||||
c.modeMtx.RUnlock()
|
||||
time.Sleep(time.Second)
|
||||
continue
|
||||
}
|
||||
|
||||
// We put objects in batches of fixed size to not interfere with main put cycle a lot.
|
||||
_ = c.db.View(func(tx *bbolt.Tx) error {
|
||||
b := tx.Bucket(defaultBucket)
|
||||
|
@ -117,31 +117,25 @@ func (c *Cache) flushSmallObjects() {
|
|||
|
||||
var count int
|
||||
for i := range m {
|
||||
if c.flushed.Contains(m[i].addr) {
|
||||
continue
|
||||
}
|
||||
|
||||
obj := object.New()
|
||||
if err := obj.Unmarshal(m[i].data); err != nil {
|
||||
data := m[i].data
|
||||
|
||||
if err := obj.Unmarshal(data); err != nil {
|
||||
continue
|
||||
}
|
||||
|
||||
count++
|
||||
select {
|
||||
case c.flushCh <- obj:
|
||||
case <-c.closeCh:
|
||||
c.modeMtx.RUnlock()
|
||||
case c.smallFlushCh <- objWithData{obj: obj, data: data}:
|
||||
case <-c.workersChan:
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
if count == 0 {
|
||||
c.modeMtx.RUnlock()
|
||||
break
|
||||
}
|
||||
|
||||
c.modeMtx.RUnlock()
|
||||
|
||||
c.log.Debug(logs.WritecacheTriedToFlushItemsFromWritecache,
|
||||
zap.Int("count", count),
|
||||
zap.String("start", base58.Encode(lastKey)))
|
||||
|
@ -157,15 +151,12 @@ func (c *Cache) flushBigObjects(ctx context.Context) {
|
|||
if c.readOnly() {
|
||||
c.modeMtx.RUnlock()
|
||||
break
|
||||
} else if !c.initialized.Load() {
|
||||
c.modeMtx.RUnlock()
|
||||
continue
|
||||
}
|
||||
|
||||
_ = c.flushFSTree(ctx, true)
|
||||
|
||||
c.modeMtx.RUnlock()
|
||||
case <-c.closeCh:
|
||||
case <-c.workersChan:
|
||||
return
|
||||
}
|
||||
}
|
||||
|
@ -187,8 +178,12 @@ func (c *Cache) flushFSTree(ctx context.Context, ignoreErrors bool) error {
|
|||
prm.LazyHandler = func(addr oid.Address, f func() ([]byte, error)) error {
|
||||
sAddr := addr.EncodeToString()
|
||||
|
||||
if _, ok := c.store.flushed.Peek(sAddr); ok {
|
||||
return nil
|
||||
select {
|
||||
case <-c.workersChan:
|
||||
return stopIter
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
default:
|
||||
}
|
||||
|
||||
data, err := f()
|
||||
|
@ -210,7 +205,7 @@ func (c *Cache) flushFSTree(ctx context.Context, ignoreErrors bool) error {
|
|||
return err
|
||||
}
|
||||
|
||||
err = c.flushObject(ctx, &obj, data)
|
||||
err = c.flushObject(ctx, objWithData{obj: &obj, data: data})
|
||||
if err != nil {
|
||||
if ignoreErrors {
|
||||
return nil
|
||||
|
@ -218,13 +213,23 @@ func (c *Cache) flushFSTree(ctx context.Context, ignoreErrors bool) error {
|
|||
return err
|
||||
}
|
||||
|
||||
// mark object as flushed
|
||||
c.flushed.Add(sAddr, false)
|
||||
err = c.dropBigObject(ctx, addr, len(data))
|
||||
if err != nil {
|
||||
c.reportFlushError("can't drop an object from FSTree", sAddr, err)
|
||||
if ignoreErrors {
|
||||
return nil
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
_, err := c.fsTree.Iterate(prm)
|
||||
if errors.Is(err, stopIter) {
|
||||
return nil
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -232,24 +237,32 @@ func (c *Cache) flushFSTree(ctx context.Context, ignoreErrors bool) error {
|
|||
func (c *Cache) smallObjectsFlusher() {
|
||||
defer c.wg.Done()
|
||||
|
||||
var obj *object.Object
|
||||
var objAndData objWithData
|
||||
for {
|
||||
// Give priority to direct put.
|
||||
select {
|
||||
case obj = <-c.flushCh:
|
||||
case <-c.closeCh:
|
||||
case objAndData = <-c.smallFlushCh:
|
||||
case <-c.workersChan:
|
||||
return
|
||||
}
|
||||
|
||||
err := c.flushObject(context.TODO(), obj, nil)
|
||||
err := c.flushObject(context.TODO(), objAndData)
|
||||
if err == nil {
|
||||
c.flushed.Add(objectCore.AddressOf(obj).EncodeToString(), true)
|
||||
addr := objectCore.AddressOf(objAndData.obj)
|
||||
|
||||
err = c.dropSmallObject(context.TODO(), addr)
|
||||
if err != nil {
|
||||
c.reportFlushError("can't drop object from write-cache",
|
||||
addr.EncodeToString(), err)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// flushObject is used to write object directly to the main storage.
|
||||
func (c *Cache) flushObject(ctx context.Context, obj *object.Object, data []byte) error {
|
||||
func (c *Cache) flushObject(ctx context.Context, objAndData objWithData) error {
|
||||
obj := objAndData.obj
|
||||
data := objAndData.data
|
||||
addr := objectCore.AddressOf(obj)
|
||||
|
||||
var prm common.PutPrm
|
||||
|
@ -272,6 +285,11 @@ func (c *Cache) flushObject(ctx context.Context, obj *object.Object, data []byte
|
|||
|
||||
_, err = c.metabase.UpdateStorageID(updPrm)
|
||||
if err != nil {
|
||||
if errors.As(err, new(apistatus.ObjectNotFound)) || errors.As(err, new(apistatus.ObjectAlreadyRemoved)) {
|
||||
// object info is outdated in the WC
|
||||
return nil
|
||||
}
|
||||
|
||||
c.reportFlushError("can't update object storage ID",
|
||||
addr.EncodeToString(), err)
|
||||
}
|
||||
|
@ -299,16 +317,20 @@ func (c *Cache) flush(ctx context.Context, ignoreErrors bool) error {
|
|||
return err
|
||||
}
|
||||
|
||||
return c.db.View(func(tx *bbolt.Tx) error {
|
||||
var dbFunc func(func(*bbolt.Tx) error) error
|
||||
if c.readOnly() {
|
||||
dbFunc = c.db.View
|
||||
} else {
|
||||
dbFunc = c.db.Update
|
||||
}
|
||||
|
||||
return dbFunc(func(tx *bbolt.Tx) error {
|
||||
var addr oid.Address
|
||||
|
||||
b := tx.Bucket(defaultBucket)
|
||||
cs := b.Cursor()
|
||||
for k, data := cs.Seek(nil); k != nil; k, data = cs.Next() {
|
||||
sa := string(k)
|
||||
if _, ok := c.flushed.Peek(sa); ok {
|
||||
continue
|
||||
}
|
||||
|
||||
if err := addr.DecodeString(sa); err != nil {
|
||||
c.reportFlushError("can't decode object address from the DB", sa, err)
|
||||
|
@ -327,10 +349,101 @@ func (c *Cache) flush(ctx context.Context, ignoreErrors bool) error {
|
|||
return err
|
||||
}
|
||||
|
||||
if err := c.flushObject(ctx, &obj, data); err != nil {
|
||||
err := c.flushObject(ctx, objWithData{obj: &obj, data: data})
|
||||
if err != nil {
|
||||
if ignoreErrors {
|
||||
continue
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
if c.readOnly() {
|
||||
continue
|
||||
}
|
||||
|
||||
removed, err := dropObject(tx, k)
|
||||
if err != nil {
|
||||
c.reportFlushError("can't drop an object from the DB", sa, err)
|
||||
if ignoreErrors {
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
storagelog.Write(c.log,
|
||||
storagelog.AddressField(addr),
|
||||
storagelog.StorageTypeField(wcStorageType),
|
||||
storagelog.OpField("db DELETE"),
|
||||
)
|
||||
c.objCounters.decDB(removed)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
func (c *Cache) dropSmallObject(ctx context.Context, addr oid.Address) error {
|
||||
var removedBytes int
|
||||
key := []byte(addr.EncodeToString())
|
||||
var err error
|
||||
|
||||
err = c.db.Batch(func(tx *bbolt.Tx) error {
|
||||
select {
|
||||
case <-c.workersChan:
|
||||
return nil
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
default:
|
||||
}
|
||||
|
||||
removedBytes, err = dropObject(tx, key)
|
||||
|
||||
return err
|
||||
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
storagelog.Write(c.log,
|
||||
storagelog.AddressField(addr),
|
||||
storagelog.StorageTypeField(wcStorageType),
|
||||
storagelog.OpField("db DELETE"),
|
||||
)
|
||||
|
||||
if removedBytes > 0 {
|
||||
c.objCounters.decDB(removedBytes)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func dropObject(tx *bbolt.Tx, key []byte) (int, error) {
|
||||
b := tx.Bucket(defaultBucket)
|
||||
|
||||
removedBytes := len(b.Get(key))
|
||||
if removedBytes > 0 {
|
||||
return removedBytes, b.Delete(key)
|
||||
}
|
||||
|
||||
return 0, nil
|
||||
}
|
||||
|
||||
func (c *Cache) dropBigObject(ctx context.Context, addr oid.Address, size int) error {
|
||||
_, err := c.fsTree.Delete(ctx, common.DeletePrm{Address: addr})
|
||||
if err != nil {
|
||||
if errors.As(err, new(apistatus.ObjectNotFound)) {
|
||||
return nil
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
storagelog.Write(c.log,
|
||||
storagelog.AddressField(addr),
|
||||
storagelog.StorageTypeField(wcStorageType),
|
||||
storagelog.OpField("fstree DELETE"),
|
||||
)
|
||||
c.objCounters.decFS(size)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -5,7 +5,6 @@ import (
|
|||
"os"
|
||||
"path/filepath"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
objectCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor"
|
||||
|
@ -106,14 +105,6 @@ func TestFlush(t *testing.T) {
|
|||
wc, bs, mb := newCache(t)
|
||||
objects := putObjects(t, wc)
|
||||
|
||||
require.NoError(t, bs.SetMode(mode.ReadWrite))
|
||||
require.NoError(t, mb.SetMode(mode.ReadWrite))
|
||||
|
||||
wc.flushed.Add(objects[0].addr.EncodeToString(), true)
|
||||
wc.flushed.Add(objects[1].addr.EncodeToString(), false)
|
||||
|
||||
require.NoError(t, wc.Flush(context.Background(), false))
|
||||
|
||||
for i := 0; i < 2; i++ {
|
||||
var mPrm meta.GetPrm
|
||||
mPrm.SetAddress(objects[i].addr)
|
||||
|
@ -124,25 +115,10 @@ func TestFlush(t *testing.T) {
|
|||
require.Error(t, err)
|
||||
}
|
||||
|
||||
check(t, mb, bs, objects[2:])
|
||||
})
|
||||
|
||||
t.Run("flush on moving to degraded mode", func(t *testing.T) {
|
||||
wc, bs, mb := newCache(t)
|
||||
objects := putObjects(t, wc)
|
||||
|
||||
// Blobstor is read-only, so we expect en error from `flush` here.
|
||||
require.Error(t, wc.SetMode(mode.Degraded))
|
||||
|
||||
// First move to read-only mode to close background workers.
|
||||
require.NoError(t, wc.SetMode(mode.ReadOnly))
|
||||
require.NoError(t, bs.SetMode(mode.ReadWrite))
|
||||
require.NoError(t, mb.SetMode(mode.ReadWrite))
|
||||
|
||||
wc.flushed.Add(objects[0].addr.EncodeToString(), true)
|
||||
wc.flushed.Add(objects[1].addr.EncodeToString(), false)
|
||||
|
||||
require.NoError(t, wc.SetMode(mode.Degraded))
|
||||
require.NoError(t, wc.Flush(context.Background(), false))
|
||||
|
||||
for i := 0; i < 2; i++ {
|
||||
var mPrm meta.GetPrm
|
||||
|
@ -151,7 +127,40 @@ func TestFlush(t *testing.T) {
|
|||
require.Error(t, err)
|
||||
|
||||
_, err = bs.Get(context.Background(), common.GetPrm{Address: objects[i].addr})
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
check(t, mb, bs, objects[2:])
|
||||
})
|
||||
|
||||
t.Run("flush on moving to degraded mode", func(t *testing.T) {
|
||||
wc, bs, mb := newCache(t)
|
||||
objects := putObjects(t, wc)
|
||||
|
||||
// Moving to the degraded mode is called with `ignoreErrors` so
|
||||
// we do not expect an error from `flush` here.
|
||||
require.NoError(t, wc.SetMode(mode.Degraded))
|
||||
|
||||
// bs is read-only; so is can't get the objects
|
||||
for i := 0; i < 2; i++ {
|
||||
var mPrm meta.GetPrm
|
||||
mPrm.SetAddress(objects[i].addr)
|
||||
_, err := mb.Get(context.Background(), mPrm)
|
||||
require.Error(t, err)
|
||||
|
||||
_, err = bs.Get(context.Background(), common.GetPrm{Address: objects[i].addr})
|
||||
require.Error(t, err)
|
||||
}
|
||||
|
||||
require.NoError(t, wc.SetMode(mode.ReadWrite))
|
||||
require.NoError(t, bs.SetMode(mode.ReadWrite))
|
||||
require.NoError(t, mb.SetMode(mode.ReadWrite))
|
||||
|
||||
require.NoError(t, wc.SetMode(mode.Degraded))
|
||||
|
||||
for i := 0; i < 2; i++ {
|
||||
_, err := bs.Get(context.Background(), common.GetPrm{Address: objects[i].addr})
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
check(t, mb, bs, objects[2:])
|
||||
|
@ -166,7 +175,6 @@ func TestFlush(t *testing.T) {
|
|||
objects := putObjects(t, wc)
|
||||
f(wc)
|
||||
|
||||
require.NoError(t, wc.SetMode(mode.ReadOnly))
|
||||
require.NoError(t, bs.SetMode(mode.ReadWrite))
|
||||
require.NoError(t, mb.SetMode(mode.ReadWrite))
|
||||
|
||||
|
@ -224,7 +232,7 @@ func TestFlush(t *testing.T) {
|
|||
})
|
||||
})
|
||||
|
||||
t.Run("on init", func(t *testing.T) {
|
||||
t.Run("flush", func(t *testing.T) {
|
||||
wc, bs, mb := newCache(t)
|
||||
objects := []objectPair{
|
||||
// removed
|
||||
|
@ -260,9 +268,6 @@ func TestFlush(t *testing.T) {
|
|||
_, err = mb.Delete(context.Background(), deletePrm)
|
||||
require.NoError(t, err)
|
||||
|
||||
require.NoError(t, bs.SetMode(mode.ReadOnly))
|
||||
require.NoError(t, mb.SetMode(mode.ReadOnly))
|
||||
|
||||
// Open in read-only: no error, nothing is removed.
|
||||
require.NoError(t, wc.Open(true))
|
||||
initWC(t, wc)
|
||||
|
@ -275,13 +280,17 @@ func TestFlush(t *testing.T) {
|
|||
// Open in read-write: no error, something is removed.
|
||||
require.NoError(t, wc.Open(false))
|
||||
initWC(t, wc)
|
||||
|
||||
for i := range objects {
|
||||
_, err := wc.Get(context.Background(), objects[i].addr)
|
||||
if i < 2 {
|
||||
require.ErrorAs(t, err, new(apistatus.ObjectNotFound), i)
|
||||
} else {
|
||||
require.NoError(t, err, i)
|
||||
}
|
||||
require.NoError(t, err, i)
|
||||
}
|
||||
|
||||
require.NoError(t, wc.Flush(context.Background(), true))
|
||||
|
||||
for i := range objects {
|
||||
_, err := wc.Get(context.Background(), objects[i].addr)
|
||||
require.ErrorAs(t, err, new(apistatus.ObjectNotFound), i)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
@ -321,10 +330,6 @@ func newObject(t *testing.T, size int) (*object.Object, []byte) {
|
|||
|
||||
func initWC(t *testing.T, wc *Cache) {
|
||||
require.NoError(t, wc.Init())
|
||||
|
||||
require.Eventually(t, func() bool {
|
||||
return wc.initialized.Load()
|
||||
}, 100*time.Second, 1*time.Millisecond)
|
||||
}
|
||||
|
||||
type dummyEpoch struct{}
|
||||
|
|
|
@ -30,7 +30,6 @@ func (c *Cache) Get(ctx context.Context, addr oid.Address) (*objectSDK.Object, e
|
|||
value, err := Get(c.db, []byte(saddr))
|
||||
if err == nil {
|
||||
obj := objectSDK.New()
|
||||
c.flushed.Get(saddr)
|
||||
return obj, obj.Unmarshal(value)
|
||||
}
|
||||
|
||||
|
@ -39,7 +38,6 @@ func (c *Cache) Get(ctx context.Context, addr oid.Address) (*objectSDK.Object, e
|
|||
return nil, logicerr.Wrap(apistatus.ObjectNotFound{})
|
||||
}
|
||||
|
||||
c.flushed.Get(saddr)
|
||||
return res.Object, nil
|
||||
}
|
||||
|
||||
|
|
|
@ -2,191 +2,33 @@ package writecache
|
|||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"sync"
|
||||
"fmt"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
||||
storagelog "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/log"
|
||||
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
|
||||
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
"go.etcd.io/bbolt"
|
||||
"go.uber.org/zap"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/pkg/tracing"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
|
||||
)
|
||||
|
||||
func (c *Cache) initFlushMarks(ctx context.Context) {
|
||||
var localWG sync.WaitGroup
|
||||
// Init runs necessary services.
|
||||
func (c *Cache) Init() error {
|
||||
ctx, span := tracing.StartSpanFromContext(context.TODO(), "writecache.Init")
|
||||
defer span.End()
|
||||
|
||||
localWG.Add(1)
|
||||
go func() {
|
||||
defer localWG.Done()
|
||||
c.modeMtx.Lock()
|
||||
defer c.modeMtx.Unlock()
|
||||
|
||||
c.fsTreeFlushMarkUpdate(ctx)
|
||||
}()
|
||||
|
||||
localWG.Add(1)
|
||||
go func() {
|
||||
defer localWG.Done()
|
||||
|
||||
c.dbFlushMarkUpdate(ctx)
|
||||
}()
|
||||
|
||||
c.initWG.Add(1)
|
||||
c.wg.Add(1)
|
||||
go func() {
|
||||
defer c.wg.Done()
|
||||
defer c.initWG.Done()
|
||||
|
||||
localWG.Wait()
|
||||
|
||||
select {
|
||||
case <-c.stopInitCh:
|
||||
return
|
||||
case <-c.closeCh:
|
||||
return
|
||||
default:
|
||||
}
|
||||
|
||||
c.initialized.Store(true)
|
||||
}()
|
||||
}
|
||||
|
||||
var errStopIter = errors.New("stop iteration")
|
||||
|
||||
func (c *Cache) fsTreeFlushMarkUpdate(ctx context.Context) {
|
||||
c.log.Info(logs.WritecacheFillingFlushMarksForObjectsInFSTree)
|
||||
|
||||
var prm common.IteratePrm
|
||||
prm.LazyHandler = func(addr oid.Address, _ func() ([]byte, error)) error {
|
||||
select {
|
||||
case <-c.closeCh:
|
||||
return errStopIter
|
||||
case <-c.stopInitCh:
|
||||
return errStopIter
|
||||
default:
|
||||
}
|
||||
|
||||
flushed, needRemove := c.flushStatus(ctx, addr)
|
||||
if flushed {
|
||||
c.store.flushed.Add(addr.EncodeToString(), true)
|
||||
if needRemove {
|
||||
var prm common.DeletePrm
|
||||
prm.Address = addr
|
||||
|
||||
_, err := c.fsTree.Delete(ctx, prm)
|
||||
if err == nil {
|
||||
storagelog.Write(c.log,
|
||||
storagelog.AddressField(addr),
|
||||
storagelog.StorageTypeField(wcStorageType),
|
||||
storagelog.OpField("fstree DELETE"),
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
if c.mode.NoMetabase() {
|
||||
return nil
|
||||
}
|
||||
|
||||
c.modeMtx.RLock()
|
||||
defer c.modeMtx.RUnlock()
|
||||
|
||||
_, _ = c.fsTree.Iterate(prm)
|
||||
|
||||
c.log.Info(logs.WritecacheFinishedUpdatingFSTreeFlushMarks)
|
||||
}
|
||||
|
||||
func (c *Cache) dbFlushMarkUpdate(ctx context.Context) {
|
||||
c.log.Info(logs.WritecacheFillingFlushMarksForObjectsInDatabase)
|
||||
|
||||
c.modeMtx.RLock()
|
||||
defer c.modeMtx.RUnlock()
|
||||
|
||||
var m []string
|
||||
var indices []int
|
||||
var lastKey []byte
|
||||
var batchSize = flushBatchSize
|
||||
for {
|
||||
select {
|
||||
case <-c.closeCh:
|
||||
return
|
||||
case <-c.stopInitCh:
|
||||
return
|
||||
default:
|
||||
}
|
||||
|
||||
m = m[:0]
|
||||
indices = indices[:0]
|
||||
|
||||
// We put objects in batches of fixed size to not interfere with main put cycle a lot.
|
||||
_ = c.db.View(func(tx *bbolt.Tx) error {
|
||||
b := tx.Bucket(defaultBucket)
|
||||
cs := b.Cursor()
|
||||
for k, _ := cs.Seek(lastKey); k != nil && len(m) < batchSize; k, _ = cs.Next() {
|
||||
m = append(m, string(k))
|
||||
}
|
||||
return nil
|
||||
})
|
||||
|
||||
var addr oid.Address
|
||||
for i := range m {
|
||||
if err := addr.DecodeString(m[i]); err != nil {
|
||||
continue
|
||||
}
|
||||
|
||||
flushed, needRemove := c.flushStatus(ctx, addr)
|
||||
if flushed {
|
||||
c.store.flushed.Add(addr.EncodeToString(), true)
|
||||
if needRemove {
|
||||
indices = append(indices, i)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if len(m) == 0 {
|
||||
break
|
||||
}
|
||||
|
||||
err := c.db.Batch(func(tx *bbolt.Tx) error {
|
||||
b := tx.Bucket(defaultBucket)
|
||||
for _, j := range indices {
|
||||
if err := b.Delete([]byte(m[j])); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
})
|
||||
if err == nil {
|
||||
for _, j := range indices {
|
||||
storagelog.Write(c.log,
|
||||
zap.String("address", m[j]),
|
||||
storagelog.StorageTypeField(wcStorageType),
|
||||
storagelog.OpField("db DELETE"),
|
||||
)
|
||||
}
|
||||
}
|
||||
lastKey = append([]byte(m[len(m)-1]), 0)
|
||||
}
|
||||
|
||||
c.log.Info(logs.WritecacheFinishedUpdatingFlushMarks)
|
||||
}
|
||||
|
||||
// flushStatus returns info about the object state in the main storage.
|
||||
// First return value is true iff object exists.
|
||||
// Second return value is true iff object can be safely removed.
|
||||
func (c *Cache) flushStatus(ctx context.Context, addr oid.Address) (bool, bool) {
|
||||
var existsPrm meta.ExistsPrm
|
||||
existsPrm.SetAddress(addr)
|
||||
|
||||
_, err := c.metabase.Exists(ctx, existsPrm)
|
||||
err := c.initCounters(ctx)
|
||||
if err != nil {
|
||||
needRemove := errors.Is(err, meta.ErrObjectIsExpired) || errors.As(err, new(apistatus.ObjectAlreadyRemoved))
|
||||
return needRemove, needRemove
|
||||
return fmt.Errorf("initializing write-cache size: %w", err)
|
||||
}
|
||||
|
||||
var prm meta.StorageIDPrm
|
||||
prm.SetAddress(addr)
|
||||
if c.mode == mode.ReadWrite {
|
||||
c.workersChan = make(chan struct{})
|
||||
c.runFlushLoop()
|
||||
}
|
||||
|
||||
mRes, _ := c.metabase.StorageID(ctx, prm)
|
||||
res, err := c.blobstor.Exists(ctx, common.ExistsPrm{Address: addr, StorageID: mRes.StorageID()})
|
||||
return err == nil && res.Exists, false
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -41,9 +41,6 @@ func (c *Cache) Iterate(prm IterationPrm) error {
|
|||
err := c.db.View(func(tx *bbolt.Tx) error {
|
||||
b := tx.Bucket(defaultBucket)
|
||||
return b.ForEach(func(k, data []byte) error {
|
||||
if _, ok := c.flushed.Peek(string(k)); ok {
|
||||
return nil
|
||||
}
|
||||
return prm.handler(data)
|
||||
})
|
||||
})
|
||||
|
@ -54,9 +51,6 @@ func (c *Cache) Iterate(prm IterationPrm) error {
|
|||
var fsPrm common.IteratePrm
|
||||
fsPrm.IgnoreErrors = prm.ignoreErrors
|
||||
fsPrm.LazyHandler = func(addr oid.Address, f func() ([]byte, error)) error {
|
||||
if _, ok := c.flushed.Peek(addr.EncodeToString()); ok {
|
||||
return nil
|
||||
}
|
||||
data, err := f()
|
||||
if err != nil {
|
||||
if prm.ignoreErrors {
|
||||
|
|
|
@ -16,9 +16,6 @@ import (
|
|||
// ErrReadOnly is returned when Put/Write is performed in a read-only mode.
|
||||
var ErrReadOnly = logicerr.New("write-cache is in read-only mode")
|
||||
|
||||
// ErrNotInitialized is returned when write-cache is initializing.
|
||||
var ErrNotInitialized = logicerr.New("write-cache is not initialized yet")
|
||||
|
||||
// SetMode sets write-cache mode of operation.
|
||||
// When shard is put in read-only mode all objects in memory are flushed to disk
|
||||
// and all background jobs are suspended.
|
||||
|
@ -35,29 +32,28 @@ func (c *Cache) SetMode(m mode.Mode) error {
|
|||
// setMode applies new mode. Must be called with cache.modeMtx lock taken.
|
||||
func (c *Cache) setMode(ctx context.Context, m mode.Mode) error {
|
||||
var err error
|
||||
turnOffMeta := m.NoMetabase()
|
||||
|
||||
if !c.initialized.Load() {
|
||||
close(c.stopInitCh)
|
||||
|
||||
c.initWG.Wait()
|
||||
c.stopInitCh = make(chan struct{})
|
||||
|
||||
defer func() {
|
||||
if err == nil && !turnOffMeta {
|
||||
c.initFlushMarks(ctx)
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
c.modeMtx.Lock()
|
||||
defer c.modeMtx.Unlock()
|
||||
|
||||
if turnOffMeta && !c.mode.NoMetabase() {
|
||||
var workersActive bool
|
||||
select {
|
||||
case <-c.workersChan:
|
||||
default:
|
||||
workersActive = true
|
||||
}
|
||||
|
||||
stopWorkers := m.NoMetabase() && !c.mode.NoMetabase() || c.mode.ReadWrite() && !m.ReadWrite()
|
||||
|
||||
if stopWorkers {
|
||||
err = c.flush(ctx, true)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if workersActive {
|
||||
close(c.workersChan)
|
||||
c.wg.Wait()
|
||||
}
|
||||
}
|
||||
|
||||
if c.db != nil {
|
||||
|
@ -67,14 +63,14 @@ func (c *Cache) setMode(ctx context.Context, m mode.Mode) error {
|
|||
}
|
||||
|
||||
// Suspend producers to ensure there are channel send operations in fly.
|
||||
// flushCh is populated by `flush` with `modeMtx` taken, thus waiting until it is empty
|
||||
// smallFlushCh is populated by `flush` with `modeMtx` taken, thus waiting until it is empty
|
||||
// guarantees that there are no in-fly operations.
|
||||
for len(c.flushCh) != 0 {
|
||||
for len(c.smallFlushCh) != 0 {
|
||||
c.log.Info(logs.WritecacheWaitingForChannelsToFlush)
|
||||
time.Sleep(time.Second)
|
||||
}
|
||||
|
||||
if turnOffMeta {
|
||||
if m.NoMetabase() {
|
||||
c.mode = m
|
||||
return nil
|
||||
}
|
||||
|
@ -84,6 +80,16 @@ func (c *Cache) setMode(ctx context.Context, m mode.Mode) error {
|
|||
}
|
||||
|
||||
c.mode = m
|
||||
|
||||
if m == mode.ReadWrite {
|
||||
select {
|
||||
case <-c.workersChan:
|
||||
c.workersChan = make(chan struct{})
|
||||
c.runFlushLoop()
|
||||
default:
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
|
@ -48,8 +48,6 @@ type options struct {
|
|||
// maxCacheSize is the maximum total size of all objects saved in cache (DB + FS).
|
||||
// 1 GiB by default.
|
||||
maxCacheSize uint64
|
||||
// objCounters contains atomic counters for the number of objects stored in cache.
|
||||
objCounters counters
|
||||
// maxBatchSize is the maximum batch size for the small object database.
|
||||
maxBatchSize int
|
||||
// maxBatchDelay is the maximum batch wait time for the small object database.
|
||||
|
|
|
@ -3,6 +3,7 @@ package writecache
|
|||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/pkg/tracing"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
||||
|
@ -37,8 +38,6 @@ func (c *Cache) Put(ctx context.Context, prm common.PutPrm) (common.PutRes, erro
|
|||
defer c.modeMtx.RUnlock()
|
||||
if c.readOnly() {
|
||||
return common.PutRes{}, ErrReadOnly
|
||||
} else if !c.initialized.Load() {
|
||||
return common.PutRes{}, ErrNotInitialized
|
||||
}
|
||||
|
||||
sz := uint64(len(prm.RawData))
|
||||
|
@ -52,23 +51,42 @@ func (c *Cache) Put(ctx context.Context, prm common.PutPrm) (common.PutRes, erro
|
|||
data: prm.RawData,
|
||||
}
|
||||
|
||||
if sz <= c.smallObjectSize {
|
||||
return common.PutRes{}, c.putSmall(oi)
|
||||
if c.maxCacheSize < c.sizeIfAdd(sz) {
|
||||
return common.PutRes{}, ErrOutOfSpace
|
||||
}
|
||||
return common.PutRes{}, c.putBig(ctx, oi.addr, prm)
|
||||
|
||||
if sz <= c.smallObjectSize {
|
||||
err := c.putSmall(oi)
|
||||
if err != nil {
|
||||
err = fmt.Errorf("could not put small object to DB: %w", err)
|
||||
}
|
||||
|
||||
return common.PutRes{}, err
|
||||
}
|
||||
|
||||
err := c.putBig(ctx, oi.addr, prm)
|
||||
if err != nil {
|
||||
err = fmt.Errorf("could not put big object to FSTree: %w", err)
|
||||
}
|
||||
|
||||
return common.PutRes{}, err
|
||||
}
|
||||
|
||||
// putSmall persists small objects to the write-cache database and
|
||||
// pushes the to the flush workers queue.
|
||||
func (c *Cache) putSmall(obj objectInfo) error {
|
||||
cacheSize := c.estimateCacheSize()
|
||||
if c.maxCacheSize < c.incSizeDB(cacheSize) {
|
||||
return ErrOutOfSpace
|
||||
}
|
||||
var alreadyExists bool
|
||||
|
||||
err := c.db.Batch(func(tx *bbolt.Tx) error {
|
||||
b := tx.Bucket(defaultBucket)
|
||||
return b.Put([]byte(obj.addr), obj.data)
|
||||
addr := []byte(obj.addr)
|
||||
|
||||
alreadyExists = len(b.Get(addr)) != 0
|
||||
if alreadyExists {
|
||||
return nil
|
||||
}
|
||||
|
||||
return b.Put(addr, obj.data)
|
||||
})
|
||||
if err == nil {
|
||||
storagelog.Write(c.log,
|
||||
|
@ -76,29 +94,22 @@ func (c *Cache) putSmall(obj objectInfo) error {
|
|||
storagelog.StorageTypeField(wcStorageType),
|
||||
storagelog.OpField("db PUT"),
|
||||
)
|
||||
c.objCounters.IncDB()
|
||||
|
||||
if !alreadyExists {
|
||||
c.objCounters.incDB(len(obj.data))
|
||||
}
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
// putBig writes object to FSTree and pushes it to the flush workers queue.
|
||||
func (c *Cache) putBig(ctx context.Context, addr string, prm common.PutPrm) error {
|
||||
cacheSz := c.estimateCacheSize()
|
||||
if c.maxCacheSize < c.incSizeFS(cacheSz) {
|
||||
return ErrOutOfSpace
|
||||
}
|
||||
|
||||
_, err := c.fsTree.Put(ctx, prm)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if c.blobstor.NeedsCompression(prm.Object) {
|
||||
c.mtx.Lock()
|
||||
c.compressFlags[addr] = struct{}{}
|
||||
c.mtx.Unlock()
|
||||
}
|
||||
c.objCounters.IncFS()
|
||||
c.objCounters.incFS(len(prm.RawData))
|
||||
storagelog.Write(c.log,
|
||||
storagelog.AddressField(addr),
|
||||
storagelog.StorageTypeField(wcStorageType),
|
||||
|
|
|
@ -1,72 +1,134 @@
|
|||
package writecache
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"sync"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
"go.etcd.io/bbolt"
|
||||
"go.uber.org/atomic"
|
||||
)
|
||||
|
||||
func (c *Cache) estimateCacheSize() uint64 {
|
||||
return c.objCounters.DB()*c.smallObjectSize + c.objCounters.FS()*c.maxObjectSize
|
||||
}
|
||||
|
||||
func (c *Cache) incSizeDB(sz uint64) uint64 {
|
||||
return sz + c.smallObjectSize
|
||||
}
|
||||
|
||||
func (c *Cache) incSizeFS(sz uint64) uint64 {
|
||||
return sz + c.maxObjectSize
|
||||
func (c *Cache) sizeIfAdd(delta uint64) uint64 {
|
||||
return delta + c.objCounters.fstreeSize.Load() + c.objCounters.dbSize.Load()
|
||||
}
|
||||
|
||||
type counters struct {
|
||||
cDB, cFS atomic.Uint64
|
||||
dbSize, fstreeSize atomic.Uint64
|
||||
}
|
||||
|
||||
func (x *counters) IncDB() {
|
||||
x.cDB.Inc()
|
||||
func (x *counters) incDB(delta int) {
|
||||
x.dbSize.Add(uint64(delta))
|
||||
}
|
||||
|
||||
func (x *counters) DecDB() {
|
||||
x.cDB.Dec()
|
||||
func (x *counters) decDB(delta int) {
|
||||
x.dbSize.Sub(uint64(delta))
|
||||
}
|
||||
|
||||
func (x *counters) DB() uint64 {
|
||||
return x.cDB.Load()
|
||||
func (x *counters) incFS(delta int) {
|
||||
x.fstreeSize.Add(uint64(delta))
|
||||
}
|
||||
|
||||
func (x *counters) IncFS() {
|
||||
x.cFS.Inc()
|
||||
func (x *counters) decFS(delta int) {
|
||||
x.fstreeSize.Sub(uint64(delta))
|
||||
}
|
||||
|
||||
func (x *counters) DecFS() {
|
||||
x.cFS.Dec()
|
||||
func (c *Cache) initCounters(ctx context.Context) error {
|
||||
var wg sync.WaitGroup
|
||||
var dbErr error
|
||||
var fsErr error
|
||||
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
dbErr = c.initDBSizeCounter(ctx)
|
||||
wg.Done()
|
||||
}()
|
||||
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
fsErr = c.initFSSizeCounter(ctx)
|
||||
wg.Done()
|
||||
}()
|
||||
|
||||
wg.Wait()
|
||||
|
||||
switch {
|
||||
case dbErr != nil:
|
||||
return fmt.Errorf("database counter initialization: %w", dbErr)
|
||||
case fsErr != nil:
|
||||
return fmt.Errorf("FSTree counter initialization: %w", fsErr)
|
||||
default:
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
func (x *counters) FS() uint64 {
|
||||
return x.cFS.Load()
|
||||
}
|
||||
var stopIter = errors.New("stop")
|
||||
|
||||
func (c *Cache) initCounters() error {
|
||||
var inDB uint64
|
||||
func (c *Cache) initDBSizeCounter(ctx context.Context) error {
|
||||
var inDB int
|
||||
err := c.db.View(func(tx *bbolt.Tx) error {
|
||||
b := tx.Bucket(defaultBucket)
|
||||
if b != nil {
|
||||
inDB = uint64(b.Stats().KeyN)
|
||||
if b == nil {
|
||||
return nil
|
||||
}
|
||||
return nil
|
||||
|
||||
return b.ForEach(func(_, v []byte) error {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
case <-c.workersChan:
|
||||
return stopIter
|
||||
default:
|
||||
}
|
||||
|
||||
inDB += len(v)
|
||||
return nil
|
||||
})
|
||||
})
|
||||
if err != nil {
|
||||
if err != nil && !errors.Is(err, stopIter) {
|
||||
return fmt.Errorf("could not read write-cache DB counter: %w", err)
|
||||
}
|
||||
|
||||
inFS, err := c.fsTree.NumberOfObjects()
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not read write-cache FS counter: %w", err)
|
||||
}
|
||||
|
||||
c.objCounters.cDB.Store(inDB)
|
||||
c.objCounters.cFS.Store(inFS)
|
||||
c.objCounters.dbSize.Store(uint64(inDB))
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *Cache) initFSSizeCounter(ctx context.Context) error {
|
||||
var inFSTree int
|
||||
|
||||
var prm common.IteratePrm
|
||||
prm.LazyHandler = func(address oid.Address, f func() ([]byte, error)) error {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
case <-c.workersChan:
|
||||
return stopIter
|
||||
default:
|
||||
}
|
||||
|
||||
data, err := f()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// write-cache is a temporary storage on a fast disk,
|
||||
// so it is not expected to be configured with any
|
||||
// compressor ever
|
||||
inFSTree += len(data)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
_, err := c.fsTree.Iterate(prm)
|
||||
if err != nil && !errors.Is(err, stopIter) {
|
||||
return fmt.Errorf("could not read write-cache FSTree counter: %w", err)
|
||||
}
|
||||
|
||||
c.objCounters.fstreeSize.Store(uint64(inFSTree))
|
||||
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -1,40 +1,18 @@
|
|||
package writecache
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"os"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/fstree"
|
||||
storagelog "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/log"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util"
|
||||
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
lru "github.com/hashicorp/golang-lru/v2"
|
||||
"github.com/hashicorp/golang-lru/v2/simplelru"
|
||||
"go.etcd.io/bbolt"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
// store represents persistent storage with in-memory LRU cache
|
||||
// smallStore represents persistent storage with in-memory LRU cache
|
||||
// for flushed items on top of it.
|
||||
type store struct {
|
||||
maxFlushedMarksCount int
|
||||
maxRemoveBatchSize int
|
||||
|
||||
// flushed contains addresses of objects that were already flushed to the main storage.
|
||||
// We use LRU cache instead of map here to facilitate removing of unused object in favour of
|
||||
// frequently read ones.
|
||||
// MUST NOT be used inside bolt db transaction because it's eviction handler
|
||||
// removes untracked items from the database.
|
||||
flushed simplelru.LRUCache[string, bool]
|
||||
db *bbolt.DB
|
||||
|
||||
dbKeysToRemove []string
|
||||
fsKeysToRemove []string
|
||||
type smallStore struct {
|
||||
db *bbolt.DB
|
||||
}
|
||||
|
||||
const dbName = "small.bolt"
|
||||
|
@ -69,101 +47,9 @@ func (c *Cache) openStore(readOnly bool) error {
|
|||
fstree.WithDepth(1),
|
||||
fstree.WithDirNameLen(1),
|
||||
fstree.WithNoSync(c.noSync))
|
||||
if err := c.fsTree.Open(readOnly); err != nil {
|
||||
if err = c.fsTree.Open(readOnly); err != nil {
|
||||
return fmt.Errorf("could not open FSTree: %w", err)
|
||||
}
|
||||
|
||||
// Write-cache can be opened multiple times during `SetMode`.
|
||||
// flushed map must not be re-created in this case.
|
||||
if c.flushed == nil {
|
||||
c.flushed, _ = lru.NewWithEvict[string, bool](c.maxFlushedMarksCount, c.removeFlushed)
|
||||
}
|
||||
|
||||
c.initialized.Store(false)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// removeFlushed removes an object from the writecache.
|
||||
// To minimize interference with the client operations, the actual removal
|
||||
// is done in batches.
|
||||
// It is not thread-safe and is used only as an evict callback to LRU cache.
|
||||
func (c *Cache) removeFlushed(key string, value bool) {
|
||||
fromDatabase := value
|
||||
if fromDatabase {
|
||||
c.dbKeysToRemove = append(c.dbKeysToRemove, key)
|
||||
} else {
|
||||
c.fsKeysToRemove = append(c.fsKeysToRemove, key)
|
||||
}
|
||||
|
||||
if len(c.dbKeysToRemove)+len(c.fsKeysToRemove) >= c.maxRemoveBatchSize {
|
||||
c.dbKeysToRemove = c.deleteFromDB(c.dbKeysToRemove)
|
||||
c.fsKeysToRemove = c.deleteFromDisk(c.fsKeysToRemove)
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Cache) deleteFromDB(keys []string) []string {
|
||||
if len(keys) == 0 {
|
||||
return keys
|
||||
}
|
||||
|
||||
var errorIndex int
|
||||
err := c.db.Batch(func(tx *bbolt.Tx) error {
|
||||
b := tx.Bucket(defaultBucket)
|
||||
for errorIndex = range keys {
|
||||
if err := b.Delete([]byte(keys[errorIndex])); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
})
|
||||
for i := 0; i < errorIndex; i++ {
|
||||
c.objCounters.DecDB()
|
||||
storagelog.Write(c.log,
|
||||
storagelog.AddressField(keys[i]),
|
||||
storagelog.StorageTypeField(wcStorageType),
|
||||
storagelog.OpField("db DELETE"),
|
||||
)
|
||||
}
|
||||
if err != nil {
|
||||
c.log.Error(logs.WritecacheCantRemoveObjectsFromTheDatabase, zap.Error(err))
|
||||
}
|
||||
|
||||
copy(keys, keys[errorIndex:])
|
||||
return keys[:len(keys)-errorIndex]
|
||||
}
|
||||
|
||||
func (c *Cache) deleteFromDisk(keys []string) []string {
|
||||
if len(keys) == 0 {
|
||||
return keys
|
||||
}
|
||||
|
||||
var copyIndex int
|
||||
var addr oid.Address
|
||||
|
||||
for i := range keys {
|
||||
if err := addr.DecodeString(keys[i]); err != nil {
|
||||
c.log.Error(logs.WritecacheCantParseAddress, zap.String("address", keys[i]))
|
||||
continue
|
||||
}
|
||||
|
||||
_, err := c.fsTree.Delete(context.TODO(), common.DeletePrm{Address: addr})
|
||||
if err != nil && !errors.As(err, new(apistatus.ObjectNotFound)) {
|
||||
c.log.Error(logs.WritecacheCantRemoveObjectFromWritecache, zap.Error(err))
|
||||
|
||||
// Save the key for the next iteration.
|
||||
keys[copyIndex] = keys[i]
|
||||
copyIndex++
|
||||
continue
|
||||
} else if err == nil {
|
||||
storagelog.Write(c.log,
|
||||
storagelog.AddressField(keys[i]),
|
||||
storagelog.StorageTypeField(wcStorageType),
|
||||
storagelog.OpField("fstree DELETE"),
|
||||
)
|
||||
c.objCounters.DecFS()
|
||||
}
|
||||
}
|
||||
|
||||
return keys[:copyIndex]
|
||||
}
|
||||
|
|
|
@ -5,13 +5,11 @@ import (
|
|||
"os"
|
||||
"sync"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/pkg/tracing"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/fstree"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||
"go.etcd.io/bbolt"
|
||||
"go.uber.org/atomic"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
|
@ -25,27 +23,21 @@ type Info struct {
|
|||
type Cache struct {
|
||||
options
|
||||
|
||||
// mtx protects statistics, counters and compressFlags.
|
||||
mtx sync.RWMutex
|
||||
// objCounters contains atomic counters for the number of objects stored in cache.
|
||||
objCounters counters
|
||||
|
||||
mode mode.Mode
|
||||
initialized atomic.Bool
|
||||
stopInitCh chan struct{} // used to sync initWG initialisation routines and _only_ them
|
||||
initWG sync.WaitGroup // for initialisation routines only
|
||||
modeMtx sync.RWMutex
|
||||
|
||||
// compressFlags maps address of a big object to boolean value indicating
|
||||
// whether object should be compressed.
|
||||
compressFlags map[string]struct{}
|
||||
modeMtx sync.RWMutex
|
||||
mode mode.Mode
|
||||
|
||||
// flushCh is a channel with objects to flush.
|
||||
flushCh chan *object.Object
|
||||
// closeCh is close channel, protected by modeMtx.
|
||||
closeCh chan struct{}
|
||||
smallFlushCh chan objWithData
|
||||
// workersChan is close channel, protected by modeMtx.
|
||||
// It indicates status of the background workers.
|
||||
workersChan chan struct{}
|
||||
// wg is a wait group for flush workers.
|
||||
wg sync.WaitGroup
|
||||
// store contains underlying database.
|
||||
store
|
||||
smallStore
|
||||
// fsTree contains big files stored directly on file-system.
|
||||
fsTree *fstree.FSTree
|
||||
}
|
||||
|
@ -70,13 +62,16 @@ var (
|
|||
)
|
||||
|
||||
// New creates new writecache instance.
|
||||
// The value must not be copied after creation.
|
||||
func New(opts ...Option) *Cache {
|
||||
c := &Cache{
|
||||
flushCh: make(chan *object.Object),
|
||||
mode: mode.ReadWrite,
|
||||
stopInitCh: make(chan struct{}),
|
||||
closeCh := make(chan struct{})
|
||||
close(closeCh)
|
||||
|
||||
c := &Cache{
|
||||
smallFlushCh: make(chan objWithData),
|
||||
mode: mode.ReadWrite,
|
||||
workersChan: closeCh,
|
||||
|
||||
compressFlags: make(map[string]struct{}),
|
||||
options: options{
|
||||
log: &logger.Logger{Logger: zap.NewNop()},
|
||||
maxObjectSize: defaultMaxObjectSize,
|
||||
|
@ -93,12 +88,6 @@ func New(opts ...Option) *Cache {
|
|||
opts[i](&c.options)
|
||||
}
|
||||
|
||||
// Make the LRU cache contain which take approximately 3/4 of the maximum space.
|
||||
// Assume small and big objects are stored in 50-50 proportion.
|
||||
c.maxFlushedMarksCount = int(c.maxCacheSize/c.maxObjectSize+c.maxCacheSize/c.smallObjectSize) / 2 * 3 / 4
|
||||
// Trigger the removal when the cache is 7/8 full, so that new items can still arrive.
|
||||
c.maxRemoveBatchSize = c.maxFlushedMarksCount / 8
|
||||
|
||||
return c
|
||||
}
|
||||
|
||||
|
@ -120,40 +109,31 @@ func (c *Cache) Open(readOnly bool) error {
|
|||
return err
|
||||
}
|
||||
|
||||
// Opening after Close is done during maintenance mode,
|
||||
// thus we need to create a channel here.
|
||||
c.closeCh = make(chan struct{})
|
||||
c.modeMtx.Lock()
|
||||
defer c.modeMtx.Unlock()
|
||||
|
||||
return c.initCounters()
|
||||
}
|
||||
if readOnly {
|
||||
c.mode = mode.ReadOnly
|
||||
} else {
|
||||
c.mode = mode.ReadWrite
|
||||
}
|
||||
|
||||
// Init runs necessary services.
|
||||
func (c *Cache) Init() error {
|
||||
ctx, span := tracing.StartSpanFromContext(context.TODO(), "writecache.Init")
|
||||
defer span.End()
|
||||
|
||||
c.initFlushMarks(ctx)
|
||||
c.runFlushLoop()
|
||||
return nil
|
||||
}
|
||||
|
||||
// Close closes db connection and stops services. Executes ObjectCounters.FlushAndClose op.
|
||||
func (c *Cache) Close() error {
|
||||
// Finish all in-progress operations.
|
||||
if err := c.setMode(context.TODO(), mode.ReadOnly); err != nil {
|
||||
return err
|
||||
// Finish all in-progress operations if they are
|
||||
// in progress.
|
||||
select {
|
||||
case <-c.workersChan:
|
||||
default:
|
||||
err := c.setMode(context.TODO(), mode.ReadOnly)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
if c.closeCh != nil {
|
||||
close(c.closeCh)
|
||||
}
|
||||
c.wg.Wait()
|
||||
if c.closeCh != nil {
|
||||
c.closeCh = nil
|
||||
}
|
||||
|
||||
c.initialized.Store(false)
|
||||
|
||||
var err error
|
||||
if c.db != nil {
|
||||
err = c.db.Close()
|
||||
|
|
Loading…
Reference in a new issue
wow. looks too complex.