Compare commits

...

4 commits

Author SHA1 Message Date
Pavel Karpy
34544502dc [#314] wc: Simplify WC
Do not use write-cache as a read cache: always remove objects from the
WC, not only if an object hasn't been used for some time (LRU cache is
dropped). Use object size (in bytes) as a metric of used space, not an
approximate (and too inaccurate) maximum stored objects number.

Signed-off-by: Pavel Karpy <p.karpy@yadro.com>
2023-05-05 21:05:31 +03:00
Pavel Karpy
99f76599e8 [#314] wc: Do not lose small objects on disk errors
Do return error if an object could not been stored on WC's disk.

Signed-off-by: Pavel Karpy <p.karpy@yadro.com>
2023-05-05 21:05:31 +03:00
Pavel Karpy
1e6ffd45ad [#314] wc: Simplify background workers naming
Also, drop not used arg.

Signed-off-by: Pavel Karpy <p.karpy@yadro.com>
2023-05-05 21:05:31 +03:00
Pavel Karpy
77b1b80e73 [#314] wc: Drop WC interface
It is not used for testing, its absence does not break build. The only one
implementation is placed in the same package.

Signed-off-by: Pavel Karpy <p.karpy@yadro.com>
2023-05-05 21:05:31 +03:00
16 changed files with 470 additions and 580 deletions

View file

@ -91,7 +91,7 @@ func (s *Shard) Get(ctx context.Context, prm GetPrm) (GetRes, error) {
return res.Object, nil return res.Object, nil
} }
wc := func(c writecache.Cache) (*objectSDK.Object, error) { wc := func(c *writecache.Cache) (*objectSDK.Object, error) {
return c.Get(ctx, prm.addr) return c.Get(ctx, prm.addr)
} }
@ -109,7 +109,7 @@ func (s *Shard) Get(ctx context.Context, prm GetPrm) (GetRes, error) {
var emptyStorageID = make([]byte, 0) var emptyStorageID = make([]byte, 0)
// fetchObjectData looks through writeCache and blobStor to find object. // fetchObjectData looks through writeCache and blobStor to find object.
func (s *Shard) fetchObjectData(ctx context.Context, addr oid.Address, skipMeta bool, cb storFetcher, wc func(w writecache.Cache) (*objectSDK.Object, error)) (*objectSDK.Object, bool, error) { func (s *Shard) fetchObjectData(ctx context.Context, addr oid.Address, skipMeta bool, cb storFetcher, wc func(w *writecache.Cache) (*objectSDK.Object, error)) (*objectSDK.Object, bool, error) {
var ( var (
mErr error mErr error
mRes meta.ExistsRes mRes meta.ExistsRes

View file

@ -57,3 +57,7 @@ func (m Mode) NoMetabase() bool {
func (m Mode) ReadOnly() bool { func (m Mode) ReadOnly() bool {
return m&ReadOnly != 0 return m&ReadOnly != 0
} }
func (m Mode) ReadWrite() bool {
return m == 0
}

View file

@ -104,7 +104,7 @@ func (s *Shard) GetRange(ctx context.Context, prm RngPrm) (RngRes, error) {
return obj, nil return obj, nil
} }
wc := func(c writecache.Cache) (*object.Object, error) { wc := func(c *writecache.Cache) (*object.Object, error) {
res, err := c.Get(ctx, prm.addr) res, err := c.Get(ctx, prm.addr)
if err != nil { if err != nil {
return nil, err return nil, err

View file

@ -23,7 +23,7 @@ type Shard struct {
gc *gc gc *gc
writeCache writecache.Cache writeCache *writecache.Cache
blobStor *blobstor.BlobStor blobStor *blobstor.BlobStor

View file

@ -2,10 +2,12 @@ package writecache
import ( import (
"context" "context"
"errors"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/pkg/tracing" "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/pkg/tracing"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
storagelog "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/log" storagelog "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/log"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"go.etcd.io/bbolt" "go.etcd.io/bbolt"
"go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/attribute"
@ -15,7 +17,7 @@ import (
// Delete removes object from write-cache. // Delete removes object from write-cache.
// //
// Returns an error of type apistatus.ObjectNotFound if object is missing in write-cache. // Returns an error of type apistatus.ObjectNotFound if object is missing in write-cache.
func (c *cache) Delete(ctx context.Context, addr oid.Address) error { func (c *Cache) Delete(ctx context.Context, addr oid.Address) error {
ctx, span := tracing.StartSpanFromContext(ctx, "writecache.Delete", ctx, span := tracing.StartSpanFromContext(ctx, "writecache.Delete",
trace.WithAttributes( trace.WithAttributes(
attribute.String("address", addr.EncodeToString()), attribute.String("address", addr.EncodeToString()),
@ -31,14 +33,14 @@ func (c *cache) Delete(ctx context.Context, addr oid.Address) error {
saddr := addr.EncodeToString() saddr := addr.EncodeToString()
// Check disk cache. // Check disk cache.
var has int var valLen int
_ = c.db.View(func(tx *bbolt.Tx) error { _ = c.db.View(func(tx *bbolt.Tx) error {
b := tx.Bucket(defaultBucket) b := tx.Bucket(defaultBucket)
has = len(b.Get([]byte(saddr))) valLen = len(b.Get([]byte(saddr)))
return nil return nil
}) })
if 0 < has { if valLen > 0 {
err := c.db.Update(func(tx *bbolt.Tx) error { err := c.db.Update(func(tx *bbolt.Tx) error {
b := tx.Bucket(defaultBucket) b := tx.Bucket(defaultBucket)
err := b.Delete([]byte(saddr)) err := b.Delete([]byte(saddr))
@ -52,18 +54,32 @@ func (c *cache) Delete(ctx context.Context, addr oid.Address) error {
storagelog.StorageTypeField(wcStorageType), storagelog.StorageTypeField(wcStorageType),
storagelog.OpField("db DELETE"), storagelog.OpField("db DELETE"),
) )
c.objCounters.DecDB() c.objCounters.decDB(valLen)
return nil return nil
} }
_, err := c.fsTree.Delete(ctx, common.DeletePrm{Address: addr}) // While getting an object looks overheadly, it allows to
// get its size correctly without any additional memory/disk/CPU
// usage on the WC's side _for every object_. `Delete` is not
// expected to be called right after an object is put to the
// Write-cache often, and for non-existing objects (persisted
// to the main storage and dropped from the WC's storage) it
// is `os.Stat` vs `os.Remove` calls after all.
res, err := c.fsTree.Get(ctx, common.GetPrm{Address: addr})
if errors.As(err, new(apistatus.ObjectNotFound)) {
return nil
} else if err != nil {
return err
}
_, err = c.fsTree.Delete(ctx, common.DeletePrm{Address: addr})
if err == nil { if err == nil {
storagelog.Write(c.log, storagelog.Write(c.log,
storagelog.AddressField(saddr), storagelog.AddressField(saddr),
storagelog.StorageTypeField(wcStorageType), storagelog.StorageTypeField(wcStorageType),
storagelog.OpField("fstree DELETE"), storagelog.OpField("fstree DELETE"),
) )
c.objCounters.DecFS() c.objCounters.decFS(len(res.RawData))
} }
return err return err

View file

@ -11,7 +11,9 @@ import (
objectCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object" objectCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
storagelog "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/log"
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase" meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"github.com/mr-tron/base58" "github.com/mr-tron/base58"
@ -33,11 +35,16 @@ const (
defaultFlushInterval = time.Second defaultFlushInterval = time.Second
) )
type objWithData struct {
obj *object.Object
data []byte
}
// runFlushLoop starts background workers which periodically flush objects to the blobstor. // runFlushLoop starts background workers which periodically flush objects to the blobstor.
func (c *cache) runFlushLoop() { func (c *Cache) runFlushLoop() {
for i := 0; i < c.workersCount; i++ { for i := 0; i < c.workersCount; i++ {
c.wg.Add(1) c.wg.Add(1)
go c.flushWorker(i) go c.smallObjectsFlusher()
} }
c.wg.Add(1) c.wg.Add(1)
@ -56,34 +63,27 @@ func (c *cache) runFlushLoop() {
for { for {
select { select {
case <-tt.C: case <-tt.C:
c.flushDB() c.flushSmallObjects()
tt.Reset(defaultFlushInterval) tt.Reset(defaultFlushInterval)
case <-c.closeCh: case <-c.workersChan:
return return
} }
} }
}() }()
} }
func (c *cache) flushDB() { func (c *Cache) flushSmallObjects() {
var lastKey []byte var lastKey []byte
var m []objectInfo var m []objectInfo
for { for {
select { select {
case <-c.closeCh: case <-c.workersChan:
return return
default: default:
} }
m = m[:0] m = m[:0]
c.modeMtx.RLock()
if c.readOnly() || !c.initialized.Load() {
c.modeMtx.RUnlock()
time.Sleep(time.Second)
continue
}
// We put objects in batches of fixed size to not interfere with main put cycle a lot. // We put objects in batches of fixed size to not interfere with main put cycle a lot.
_ = c.db.View(func(tx *bbolt.Tx) error { _ = c.db.View(func(tx *bbolt.Tx) error {
b := tx.Bucket(defaultBucket) b := tx.Bucket(defaultBucket)
@ -117,38 +117,32 @@ func (c *cache) flushDB() {
var count int var count int
for i := range m { for i := range m {
if c.flushed.Contains(m[i].addr) {
continue
}
obj := object.New() obj := object.New()
if err := obj.Unmarshal(m[i].data); err != nil { data := m[i].data
if err := obj.Unmarshal(data); err != nil {
continue continue
} }
count++ count++
select { select {
case c.flushCh <- obj: case c.smallFlushCh <- objWithData{obj: obj, data: data}:
case <-c.closeCh: case <-c.workersChan:
c.modeMtx.RUnlock()
return return
} }
} }
if count == 0 { if count == 0 {
c.modeMtx.RUnlock()
break break
} }
c.modeMtx.RUnlock()
c.log.Debug(logs.WritecacheTriedToFlushItemsFromWritecache, c.log.Debug(logs.WritecacheTriedToFlushItemsFromWritecache,
zap.Int("count", count), zap.Int("count", count),
zap.String("start", base58.Encode(lastKey))) zap.String("start", base58.Encode(lastKey)))
} }
} }
func (c *cache) flushBigObjects(ctx context.Context) { func (c *Cache) flushBigObjects(ctx context.Context) {
tick := time.NewTicker(defaultFlushInterval * 10) tick := time.NewTicker(defaultFlushInterval * 10)
for { for {
select { select {
@ -157,21 +151,18 @@ func (c *cache) flushBigObjects(ctx context.Context) {
if c.readOnly() { if c.readOnly() {
c.modeMtx.RUnlock() c.modeMtx.RUnlock()
break break
} else if !c.initialized.Load() {
c.modeMtx.RUnlock()
continue
} }
_ = c.flushFSTree(ctx, true) _ = c.flushFSTree(ctx, true)
c.modeMtx.RUnlock() c.modeMtx.RUnlock()
case <-c.closeCh: case <-c.workersChan:
return return
} }
} }
} }
func (c *cache) reportFlushError(msg string, addr string, err error) { func (c *Cache) reportFlushError(msg string, addr string, err error) {
if c.reportError != nil { if c.reportError != nil {
c.reportError(msg, err) c.reportError(msg, err)
} else { } else {
@ -181,14 +172,18 @@ func (c *cache) reportFlushError(msg string, addr string, err error) {
} }
} }
func (c *cache) flushFSTree(ctx context.Context, ignoreErrors bool) error { func (c *Cache) flushFSTree(ctx context.Context, ignoreErrors bool) error {
var prm common.IteratePrm var prm common.IteratePrm
prm.IgnoreErrors = ignoreErrors prm.IgnoreErrors = ignoreErrors
prm.LazyHandler = func(addr oid.Address, f func() ([]byte, error)) error { prm.LazyHandler = func(addr oid.Address, f func() ([]byte, error)) error {
sAddr := addr.EncodeToString() sAddr := addr.EncodeToString()
if _, ok := c.store.flushed.Peek(sAddr); ok { select {
return nil case <-c.workersChan:
return stopIter
case <-ctx.Done():
return ctx.Err()
default:
} }
data, err := f() data, err := f()
@ -210,7 +205,7 @@ func (c *cache) flushFSTree(ctx context.Context, ignoreErrors bool) error {
return err return err
} }
err = c.flushObject(ctx, &obj, data) err = c.flushObject(ctx, objWithData{obj: &obj, data: data})
if err != nil { if err != nil {
if ignoreErrors { if ignoreErrors {
return nil return nil
@ -218,38 +213,56 @@ func (c *cache) flushFSTree(ctx context.Context, ignoreErrors bool) error {
return err return err
} }
// mark object as flushed err = c.dropBigObject(ctx, addr, len(data))
c.flushed.Add(sAddr, false) if err != nil {
c.reportFlushError("can't drop an object from FSTree", sAddr, err)
if ignoreErrors {
return nil
}
return err
}
return nil return nil
} }
_, err := c.fsTree.Iterate(prm) _, err := c.fsTree.Iterate(prm)
if errors.Is(err, stopIter) {
return nil
}
return err return err
} }
// flushWorker writes objects to the main storage. // smallObjectsFlusher writes small objects to the main storage.
func (c *cache) flushWorker(_ int) { func (c *Cache) smallObjectsFlusher() {
defer c.wg.Done() defer c.wg.Done()
var obj *object.Object var objAndData objWithData
for { for {
// Give priority to direct put. // Give priority to direct put.
select { select {
case obj = <-c.flushCh: case objAndData = <-c.smallFlushCh:
case <-c.closeCh: case <-c.workersChan:
return return
} }
err := c.flushObject(context.TODO(), obj, nil) err := c.flushObject(context.TODO(), objAndData)
if err == nil { if err == nil {
c.flushed.Add(objectCore.AddressOf(obj).EncodeToString(), true) addr := objectCore.AddressOf(objAndData.obj)
err = c.dropSmallObject(context.TODO(), addr)
if err != nil {
c.reportFlushError("can't drop object from write-cache",
addr.EncodeToString(), err)
}
} }
} }
} }
// flushObject is used to write object directly to the main storage. // flushObject is used to write object directly to the main storage.
func (c *cache) flushObject(ctx context.Context, obj *object.Object, data []byte) error { func (c *Cache) flushObject(ctx context.Context, objAndData objWithData) error {
obj := objAndData.obj
data := objAndData.data
addr := objectCore.AddressOf(obj) addr := objectCore.AddressOf(obj)
var prm common.PutPrm var prm common.PutPrm
@ -272,6 +285,11 @@ func (c *cache) flushObject(ctx context.Context, obj *object.Object, data []byte
_, err = c.metabase.UpdateStorageID(updPrm) _, err = c.metabase.UpdateStorageID(updPrm)
if err != nil { if err != nil {
if errors.As(err, new(apistatus.ObjectNotFound)) || errors.As(err, new(apistatus.ObjectAlreadyRemoved)) {
// object info is outdated in the WC
return nil
}
c.reportFlushError("can't update object storage ID", c.reportFlushError("can't update object storage ID",
addr.EncodeToString(), err) addr.EncodeToString(), err)
} }
@ -281,7 +299,7 @@ func (c *cache) flushObject(ctx context.Context, obj *object.Object, data []byte
// Flush flushes all objects from the write-cache to the main storage. // Flush flushes all objects from the write-cache to the main storage.
// Write-cache must be in readonly mode to ensure correctness of an operation and // Write-cache must be in readonly mode to ensure correctness of an operation and
// to prevent interference with background flush workers. // to prevent interference with background flush workers.
func (c *cache) Flush(ctx context.Context, ignoreErrors bool) error { func (c *Cache) Flush(ctx context.Context, ignoreErrors bool) error {
ctx, span := tracing.StartSpanFromContext(ctx, "writecache.Flush", ctx, span := tracing.StartSpanFromContext(ctx, "writecache.Flush",
trace.WithAttributes( trace.WithAttributes(
attribute.Bool("ignore_errors", ignoreErrors), attribute.Bool("ignore_errors", ignoreErrors),
@ -294,21 +312,25 @@ func (c *cache) Flush(ctx context.Context, ignoreErrors bool) error {
return c.flush(ctx, ignoreErrors) return c.flush(ctx, ignoreErrors)
} }
func (c *cache) flush(ctx context.Context, ignoreErrors bool) error { func (c *Cache) flush(ctx context.Context, ignoreErrors bool) error {
if err := c.flushFSTree(ctx, ignoreErrors); err != nil { if err := c.flushFSTree(ctx, ignoreErrors); err != nil {
return err return err
} }
return c.db.View(func(tx *bbolt.Tx) error { var dbFunc func(func(*bbolt.Tx) error) error
if c.readOnly() {
dbFunc = c.db.View
} else {
dbFunc = c.db.Update
}
return dbFunc(func(tx *bbolt.Tx) error {
var addr oid.Address var addr oid.Address
b := tx.Bucket(defaultBucket) b := tx.Bucket(defaultBucket)
cs := b.Cursor() cs := b.Cursor()
for k, data := cs.Seek(nil); k != nil; k, data = cs.Next() { for k, data := cs.Seek(nil); k != nil; k, data = cs.Next() {
sa := string(k) sa := string(k)
if _, ok := c.flushed.Peek(sa); ok {
continue
}
if err := addr.DecodeString(sa); err != nil { if err := addr.DecodeString(sa); err != nil {
c.reportFlushError("can't decode object address from the DB", sa, err) c.reportFlushError("can't decode object address from the DB", sa, err)
@ -327,10 +349,101 @@ func (c *cache) flush(ctx context.Context, ignoreErrors bool) error {
return err return err
} }
if err := c.flushObject(ctx, &obj, data); err != nil { err := c.flushObject(ctx, objWithData{obj: &obj, data: data})
if err != nil {
if ignoreErrors {
continue
}
return err return err
} }
if c.readOnly() {
continue
}
removed, err := dropObject(tx, k)
if err != nil {
c.reportFlushError("can't drop an object from the DB", sa, err)
if ignoreErrors {
continue
}
}
storagelog.Write(c.log,
storagelog.AddressField(addr),
storagelog.StorageTypeField(wcStorageType),
storagelog.OpField("db DELETE"),
)
c.objCounters.decDB(removed)
} }
return nil return nil
}) })
} }
func (c *Cache) dropSmallObject(ctx context.Context, addr oid.Address) error {
var removedBytes int
key := []byte(addr.EncodeToString())
var err error
err = c.db.Batch(func(tx *bbolt.Tx) error {
select {
case <-c.workersChan:
return nil
case <-ctx.Done():
return ctx.Err()
default:
}
removedBytes, err = dropObject(tx, key)
return err
})
if err != nil {
return err
}
storagelog.Write(c.log,
storagelog.AddressField(addr),
storagelog.StorageTypeField(wcStorageType),
storagelog.OpField("db DELETE"),
)
if removedBytes > 0 {
c.objCounters.decDB(removedBytes)
}
return nil
}
func dropObject(tx *bbolt.Tx, key []byte) (int, error) {
b := tx.Bucket(defaultBucket)
removedBytes := len(b.Get(key))
if removedBytes > 0 {
return removedBytes, b.Delete(key)
}
return 0, nil
}
func (c *Cache) dropBigObject(ctx context.Context, addr oid.Address, size int) error {
_, err := c.fsTree.Delete(ctx, common.DeletePrm{Address: addr})
if err != nil {
if errors.As(err, new(apistatus.ObjectNotFound)) {
return nil
}
return err
}
storagelog.Write(c.log,
storagelog.AddressField(addr),
storagelog.StorageTypeField(wcStorageType),
storagelog.OpField("fstree DELETE"),
)
c.objCounters.decFS(size)
return nil
}

View file

@ -5,7 +5,6 @@ import (
"os" "os"
"path/filepath" "path/filepath"
"testing" "testing"
"time"
objectCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object" objectCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor"
@ -39,7 +38,7 @@ func TestFlush(t *testing.T) {
smallSize = 256 smallSize = 256
) )
newCache := func(t *testing.T, opts ...Option) (Cache, *blobstor.BlobStor, *meta.DB) { newCache := func(t *testing.T, opts ...Option) (*Cache, *blobstor.BlobStor, *meta.DB) {
dir := t.TempDir() dir := t.TempDir()
mb := meta.New( mb := meta.New(
meta.WithPath(filepath.Join(dir, "meta")), meta.WithPath(filepath.Join(dir, "meta")),
@ -76,7 +75,7 @@ func TestFlush(t *testing.T) {
return wc, bs, mb return wc, bs, mb
} }
putObjects := func(t *testing.T, c Cache) []objectPair { putObjects := func(t *testing.T, c *Cache) []objectPair {
objects := make([]objectPair, objCount) objects := make([]objectPair, objCount)
for i := range objects { for i := range objects {
objects[i] = putObject(t, c, 1+(i%2)*smallSize) objects[i] = putObject(t, c, 1+(i%2)*smallSize)
@ -106,12 +105,19 @@ func TestFlush(t *testing.T) {
wc, bs, mb := newCache(t) wc, bs, mb := newCache(t)
objects := putObjects(t, wc) objects := putObjects(t, wc)
for i := 0; i < 2; i++ {
var mPrm meta.GetPrm
mPrm.SetAddress(objects[i].addr)
_, err := mb.Get(context.Background(), mPrm)
require.Error(t, err)
_, err = bs.Get(context.Background(), common.GetPrm{Address: objects[i].addr})
require.Error(t, err)
}
require.NoError(t, bs.SetMode(mode.ReadWrite)) require.NoError(t, bs.SetMode(mode.ReadWrite))
require.NoError(t, mb.SetMode(mode.ReadWrite)) require.NoError(t, mb.SetMode(mode.ReadWrite))
wc.(*cache).flushed.Add(objects[0].addr.EncodeToString(), true)
wc.(*cache).flushed.Add(objects[1].addr.EncodeToString(), false)
require.NoError(t, wc.Flush(context.Background(), false)) require.NoError(t, wc.Flush(context.Background(), false))
for i := 0; i < 2; i++ { for i := 0; i < 2; i++ {
@ -121,7 +127,7 @@ func TestFlush(t *testing.T) {
require.Error(t, err) require.Error(t, err)
_, err = bs.Get(context.Background(), common.GetPrm{Address: objects[i].addr}) _, err = bs.Get(context.Background(), common.GetPrm{Address: objects[i].addr})
require.Error(t, err) require.NoError(t, err)
} }
check(t, mb, bs, objects[2:]) check(t, mb, bs, objects[2:])
@ -131,19 +137,11 @@ func TestFlush(t *testing.T) {
wc, bs, mb := newCache(t) wc, bs, mb := newCache(t)
objects := putObjects(t, wc) objects := putObjects(t, wc)
// Blobstor is read-only, so we expect en error from `flush` here. // Moving to the degraded mode is called with `ignoreErrors` so
require.Error(t, wc.SetMode(mode.Degraded)) // we do not expect an error from `flush` here.
// First move to read-only mode to close background workers.
require.NoError(t, wc.SetMode(mode.ReadOnly))
require.NoError(t, bs.SetMode(mode.ReadWrite))
require.NoError(t, mb.SetMode(mode.ReadWrite))
wc.(*cache).flushed.Add(objects[0].addr.EncodeToString(), true)
wc.(*cache).flushed.Add(objects[1].addr.EncodeToString(), false)
require.NoError(t, wc.SetMode(mode.Degraded)) require.NoError(t, wc.SetMode(mode.Degraded))
// bs is read-only; so is can't get the objects
for i := 0; i < 2; i++ { for i := 0; i < 2; i++ {
var mPrm meta.GetPrm var mPrm meta.GetPrm
mPrm.SetAddress(objects[i].addr) mPrm.SetAddress(objects[i].addr)
@ -154,19 +152,29 @@ func TestFlush(t *testing.T) {
require.Error(t, err) require.Error(t, err)
} }
require.NoError(t, wc.SetMode(mode.ReadWrite))
require.NoError(t, bs.SetMode(mode.ReadWrite))
require.NoError(t, mb.SetMode(mode.ReadWrite))
require.NoError(t, wc.SetMode(mode.Degraded))
for i := 0; i < 2; i++ {
_, err := bs.Get(context.Background(), common.GetPrm{Address: objects[i].addr})
require.NoError(t, err)
}
check(t, mb, bs, objects[2:]) check(t, mb, bs, objects[2:])
}) })
t.Run("ignore errors", func(t *testing.T) { t.Run("ignore errors", func(t *testing.T) {
testIgnoreErrors := func(t *testing.T, f func(*cache)) { testIgnoreErrors := func(t *testing.T, f func(*Cache)) {
var errCount atomic.Uint32 var errCount atomic.Uint32
wc, bs, mb := newCache(t, WithReportErrorFunc(func(message string, err error) { wc, bs, mb := newCache(t, WithReportErrorFunc(func(message string, err error) {
errCount.Inc() errCount.Inc()
})) }))
objects := putObjects(t, wc) objects := putObjects(t, wc)
f(wc.(*cache)) f(wc)
require.NoError(t, wc.SetMode(mode.ReadOnly))
require.NoError(t, bs.SetMode(mode.ReadWrite)) require.NoError(t, bs.SetMode(mode.ReadWrite))
require.NoError(t, mb.SetMode(mode.ReadWrite)) require.NoError(t, mb.SetMode(mode.ReadWrite))
@ -178,7 +186,7 @@ func TestFlush(t *testing.T) {
check(t, mb, bs, objects) check(t, mb, bs, objects)
} }
t.Run("db, invalid address", func(t *testing.T) { t.Run("db, invalid address", func(t *testing.T) {
testIgnoreErrors(t, func(c *cache) { testIgnoreErrors(t, func(c *Cache) {
_, data := newObject(t, 1) _, data := newObject(t, 1)
require.NoError(t, c.db.Batch(func(tx *bbolt.Tx) error { require.NoError(t, c.db.Batch(func(tx *bbolt.Tx) error {
b := tx.Bucket(defaultBucket) b := tx.Bucket(defaultBucket)
@ -187,7 +195,7 @@ func TestFlush(t *testing.T) {
}) })
}) })
t.Run("db, invalid object", func(t *testing.T) { t.Run("db, invalid object", func(t *testing.T) {
testIgnoreErrors(t, func(c *cache) { testIgnoreErrors(t, func(c *Cache) {
require.NoError(t, c.db.Batch(func(tx *bbolt.Tx) error { require.NoError(t, c.db.Batch(func(tx *bbolt.Tx) error {
b := tx.Bucket(defaultBucket) b := tx.Bucket(defaultBucket)
return b.Put([]byte(oidtest.Address().EncodeToString()), []byte{1, 2, 3}) return b.Put([]byte(oidtest.Address().EncodeToString()), []byte{1, 2, 3})
@ -195,7 +203,7 @@ func TestFlush(t *testing.T) {
}) })
}) })
t.Run("fs, read error", func(t *testing.T) { t.Run("fs, read error", func(t *testing.T) {
testIgnoreErrors(t, func(c *cache) { testIgnoreErrors(t, func(c *Cache) {
obj, data := newObject(t, 1) obj, data := newObject(t, 1)
var prm common.PutPrm var prm common.PutPrm
@ -214,7 +222,7 @@ func TestFlush(t *testing.T) {
}) })
}) })
t.Run("fs, invalid object", func(t *testing.T) { t.Run("fs, invalid object", func(t *testing.T) {
testIgnoreErrors(t, func(c *cache) { testIgnoreErrors(t, func(c *Cache) {
var prm common.PutPrm var prm common.PutPrm
prm.Address = oidtest.Address() prm.Address = oidtest.Address()
prm.RawData = []byte{1, 2, 3} prm.RawData = []byte{1, 2, 3}
@ -224,7 +232,7 @@ func TestFlush(t *testing.T) {
}) })
}) })
t.Run("on init", func(t *testing.T) { t.Run("flush", func(t *testing.T) {
wc, bs, mb := newCache(t) wc, bs, mb := newCache(t)
objects := []objectPair{ objects := []objectPair{
// removed // removed
@ -260,9 +268,6 @@ func TestFlush(t *testing.T) {
_, err = mb.Delete(context.Background(), deletePrm) _, err = mb.Delete(context.Background(), deletePrm)
require.NoError(t, err) require.NoError(t, err)
require.NoError(t, bs.SetMode(mode.ReadOnly))
require.NoError(t, mb.SetMode(mode.ReadOnly))
// Open in read-only: no error, nothing is removed. // Open in read-only: no error, nothing is removed.
require.NoError(t, wc.Open(true)) require.NoError(t, wc.Open(true))
initWC(t, wc) initWC(t, wc)
@ -275,18 +280,22 @@ func TestFlush(t *testing.T) {
// Open in read-write: no error, something is removed. // Open in read-write: no error, something is removed.
require.NoError(t, wc.Open(false)) require.NoError(t, wc.Open(false))
initWC(t, wc) initWC(t, wc)
for i := range objects { for i := range objects {
_, err := wc.Get(context.Background(), objects[i].addr) _, err := wc.Get(context.Background(), objects[i].addr)
if i < 2 {
require.ErrorAs(t, err, new(apistatus.ObjectNotFound), i)
} else {
require.NoError(t, err, i) require.NoError(t, err, i)
} }
require.NoError(t, wc.Flush(context.Background(), true))
for i := range objects {
_, err := wc.Get(context.Background(), objects[i].addr)
require.ErrorAs(t, err, new(apistatus.ObjectNotFound), i)
} }
}) })
} }
func putObject(t *testing.T, c Cache, size int) objectPair { func putObject(t *testing.T, c *Cache, size int) objectPair {
obj, data := newObject(t, size) obj, data := newObject(t, size)
var prm common.PutPrm var prm common.PutPrm
@ -319,13 +328,8 @@ func newObject(t *testing.T, size int) (*object.Object, []byte) {
return obj, data return obj, data
} }
func initWC(t *testing.T, wc Cache) { func initWC(t *testing.T, wc *Cache) {
require.NoError(t, wc.Init()) require.NoError(t, wc.Init())
require.Eventually(t, func() bool {
rawWc := wc.(*cache)
return rawWc.initialized.Load()
}, 100*time.Second, 1*time.Millisecond)
} }
type dummyEpoch struct{} type dummyEpoch struct{}

View file

@ -18,7 +18,7 @@ import (
// Get returns object from write-cache. // Get returns object from write-cache.
// //
// Returns an error of type apistatus.ObjectNotFound if the requested object is missing in write-cache. // Returns an error of type apistatus.ObjectNotFound if the requested object is missing in write-cache.
func (c *cache) Get(ctx context.Context, addr oid.Address) (*objectSDK.Object, error) { func (c *Cache) Get(ctx context.Context, addr oid.Address) (*objectSDK.Object, error) {
saddr := addr.EncodeToString() saddr := addr.EncodeToString()
ctx, span := tracing.StartSpanFromContext(ctx, "writecache.Get", ctx, span := tracing.StartSpanFromContext(ctx, "writecache.Get",
@ -30,7 +30,6 @@ func (c *cache) Get(ctx context.Context, addr oid.Address) (*objectSDK.Object, e
value, err := Get(c.db, []byte(saddr)) value, err := Get(c.db, []byte(saddr))
if err == nil { if err == nil {
obj := objectSDK.New() obj := objectSDK.New()
c.flushed.Get(saddr)
return obj, obj.Unmarshal(value) return obj, obj.Unmarshal(value)
} }
@ -39,14 +38,13 @@ func (c *cache) Get(ctx context.Context, addr oid.Address) (*objectSDK.Object, e
return nil, logicerr.Wrap(apistatus.ObjectNotFound{}) return nil, logicerr.Wrap(apistatus.ObjectNotFound{})
} }
c.flushed.Get(saddr)
return res.Object, nil return res.Object, nil
} }
// Head returns object header from write-cache. // Head returns object header from write-cache.
// //
// Returns an error of type apistatus.ObjectNotFound if the requested object is missing in write-cache. // Returns an error of type apistatus.ObjectNotFound if the requested object is missing in write-cache.
func (c *cache) Head(ctx context.Context, addr oid.Address) (*objectSDK.Object, error) { func (c *Cache) Head(ctx context.Context, addr oid.Address) (*objectSDK.Object, error) {
ctx, span := tracing.StartSpanFromContext(ctx, "writecache.Head", ctx, span := tracing.StartSpanFromContext(ctx, "writecache.Head",
trace.WithAttributes( trace.WithAttributes(
attribute.String("address", addr.EncodeToString()), attribute.String("address", addr.EncodeToString()),

View file

@ -2,191 +2,33 @@ package writecache
import ( import (
"context" "context"
"errors" "fmt"
"sync"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/pkg/tracing"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
storagelog "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/log"
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"go.etcd.io/bbolt"
"go.uber.org/zap"
) )
func (c *cache) initFlushMarks(ctx context.Context) { // Init runs necessary services.
var localWG sync.WaitGroup func (c *Cache) Init() error {
ctx, span := tracing.StartSpanFromContext(context.TODO(), "writecache.Init")
defer span.End()
localWG.Add(1) c.modeMtx.Lock()
go func() { defer c.modeMtx.Unlock()
defer localWG.Done()
c.fsTreeFlushMarkUpdate(ctx) if c.mode.NoMetabase() {
}()
localWG.Add(1)
go func() {
defer localWG.Done()
c.dbFlushMarkUpdate(ctx)
}()
c.initWG.Add(1)
c.wg.Add(1)
go func() {
defer c.wg.Done()
defer c.initWG.Done()
localWG.Wait()
select {
case <-c.stopInitCh:
return
case <-c.closeCh:
return
default:
}
c.initialized.Store(true)
}()
}
var errStopIter = errors.New("stop iteration")
func (c *cache) fsTreeFlushMarkUpdate(ctx context.Context) {
c.log.Info(logs.WritecacheFillingFlushMarksForObjectsInFSTree)
var prm common.IteratePrm
prm.LazyHandler = func(addr oid.Address, _ func() ([]byte, error)) error {
select {
case <-c.closeCh:
return errStopIter
case <-c.stopInitCh:
return errStopIter
default:
}
flushed, needRemove := c.flushStatus(ctx, addr)
if flushed {
c.store.flushed.Add(addr.EncodeToString(), true)
if needRemove {
var prm common.DeletePrm
prm.Address = addr
_, err := c.fsTree.Delete(ctx, prm)
if err == nil {
storagelog.Write(c.log,
storagelog.AddressField(addr),
storagelog.StorageTypeField(wcStorageType),
storagelog.OpField("fstree DELETE"),
)
}
}
}
return nil return nil
} }
c.modeMtx.RLock() err := c.initCounters(ctx)
defer c.modeMtx.RUnlock()
_, _ = c.fsTree.Iterate(prm)
c.log.Info(logs.WritecacheFinishedUpdatingFSTreeFlushMarks)
}
func (c *cache) dbFlushMarkUpdate(ctx context.Context) {
c.log.Info(logs.WritecacheFillingFlushMarksForObjectsInDatabase)
c.modeMtx.RLock()
defer c.modeMtx.RUnlock()
var m []string
var indices []int
var lastKey []byte
var batchSize = flushBatchSize
for {
select {
case <-c.closeCh:
return
case <-c.stopInitCh:
return
default:
}
m = m[:0]
indices = indices[:0]
// We put objects in batches of fixed size to not interfere with main put cycle a lot.
_ = c.db.View(func(tx *bbolt.Tx) error {
b := tx.Bucket(defaultBucket)
cs := b.Cursor()
for k, _ := cs.Seek(lastKey); k != nil && len(m) < batchSize; k, _ = cs.Next() {
m = append(m, string(k))
}
return nil
})
var addr oid.Address
for i := range m {
if err := addr.DecodeString(m[i]); err != nil {
continue
}
flushed, needRemove := c.flushStatus(ctx, addr)
if flushed {
c.store.flushed.Add(addr.EncodeToString(), true)
if needRemove {
indices = append(indices, i)
}
}
}
if len(m) == 0 {
break
}
err := c.db.Batch(func(tx *bbolt.Tx) error {
b := tx.Bucket(defaultBucket)
for _, j := range indices {
if err := b.Delete([]byte(m[j])); err != nil {
return err
}
}
return nil
})
if err == nil {
for _, j := range indices {
storagelog.Write(c.log,
zap.String("address", m[j]),
storagelog.StorageTypeField(wcStorageType),
storagelog.OpField("db DELETE"),
)
}
}
lastKey = append([]byte(m[len(m)-1]), 0)
}
c.log.Info(logs.WritecacheFinishedUpdatingFlushMarks)
}
// flushStatus returns info about the object state in the main storage.
// First return value is true iff object exists.
// Second return value is true iff object can be safely removed.
func (c *cache) flushStatus(ctx context.Context, addr oid.Address) (bool, bool) {
var existsPrm meta.ExistsPrm
existsPrm.SetAddress(addr)
_, err := c.metabase.Exists(ctx, existsPrm)
if err != nil { if err != nil {
needRemove := errors.Is(err, meta.ErrObjectIsExpired) || errors.As(err, new(apistatus.ObjectAlreadyRemoved)) return fmt.Errorf("initializing write-cache size: %w", err)
return needRemove, needRemove
} }
var prm meta.StorageIDPrm if c.mode == mode.ReadWrite {
prm.SetAddress(addr) c.workersChan = make(chan struct{})
c.runFlushLoop()
}
mRes, _ := c.metabase.StorageID(ctx, prm) return nil
res, err := c.blobstor.Exists(ctx, common.ExistsPrm{Address: addr, StorageID: mRes.StorageID()})
return err == nil && res.Exists, false
} }

View file

@ -31,7 +31,7 @@ func (p *IterationPrm) WithIgnoreErrors(ignore bool) {
// Iterate iterates over all objects present in write cache. // Iterate iterates over all objects present in write cache.
// This is very difficult to do correctly unless write-cache is put in read-only mode. // This is very difficult to do correctly unless write-cache is put in read-only mode.
// Thus we silently fail if shard is not in read-only mode to avoid reporting misleading results. // Thus we silently fail if shard is not in read-only mode to avoid reporting misleading results.
func (c *cache) Iterate(prm IterationPrm) error { func (c *Cache) Iterate(prm IterationPrm) error {
c.modeMtx.RLock() c.modeMtx.RLock()
defer c.modeMtx.RUnlock() defer c.modeMtx.RUnlock()
if !c.readOnly() { if !c.readOnly() {
@ -41,9 +41,6 @@ func (c *cache) Iterate(prm IterationPrm) error {
err := c.db.View(func(tx *bbolt.Tx) error { err := c.db.View(func(tx *bbolt.Tx) error {
b := tx.Bucket(defaultBucket) b := tx.Bucket(defaultBucket)
return b.ForEach(func(k, data []byte) error { return b.ForEach(func(k, data []byte) error {
if _, ok := c.flushed.Peek(string(k)); ok {
return nil
}
return prm.handler(data) return prm.handler(data)
}) })
}) })
@ -54,9 +51,6 @@ func (c *cache) Iterate(prm IterationPrm) error {
var fsPrm common.IteratePrm var fsPrm common.IteratePrm
fsPrm.IgnoreErrors = prm.ignoreErrors fsPrm.IgnoreErrors = prm.ignoreErrors
fsPrm.LazyHandler = func(addr oid.Address, f func() ([]byte, error)) error { fsPrm.LazyHandler = func(addr oid.Address, f func() ([]byte, error)) error {
if _, ok := c.flushed.Peek(addr.EncodeToString()); ok {
return nil
}
data, err := f() data, err := f()
if err != nil { if err != nil {
if prm.ignoreErrors { if prm.ignoreErrors {

View file

@ -16,13 +16,10 @@ import (
// ErrReadOnly is returned when Put/Write is performed in a read-only mode. // ErrReadOnly is returned when Put/Write is performed in a read-only mode.
var ErrReadOnly = logicerr.New("write-cache is in read-only mode") var ErrReadOnly = logicerr.New("write-cache is in read-only mode")
// ErrNotInitialized is returned when write-cache is initializing.
var ErrNotInitialized = logicerr.New("write-cache is not initialized yet")
// SetMode sets write-cache mode of operation. // SetMode sets write-cache mode of operation.
// When shard is put in read-only mode all objects in memory are flushed to disk // When shard is put in read-only mode all objects in memory are flushed to disk
// and all background jobs are suspended. // and all background jobs are suspended.
func (c *cache) SetMode(m mode.Mode) error { func (c *Cache) SetMode(m mode.Mode) error {
ctx, span := tracing.StartSpanFromContext(context.TODO(), "writecache.SetMode", ctx, span := tracing.StartSpanFromContext(context.TODO(), "writecache.SetMode",
trace.WithAttributes( trace.WithAttributes(
attribute.String("mode", m.String()), attribute.String("mode", m.String()),
@ -33,31 +30,30 @@ func (c *cache) SetMode(m mode.Mode) error {
} }
// setMode applies new mode. Must be called with cache.modeMtx lock taken. // setMode applies new mode. Must be called with cache.modeMtx lock taken.
func (c *cache) setMode(ctx context.Context, m mode.Mode) error { func (c *Cache) setMode(ctx context.Context, m mode.Mode) error {
var err error var err error
turnOffMeta := m.NoMetabase()
if !c.initialized.Load() {
close(c.stopInitCh)
c.initWG.Wait()
c.stopInitCh = make(chan struct{})
defer func() {
if err == nil && !turnOffMeta {
c.initFlushMarks(ctx)
}
}()
}
c.modeMtx.Lock() c.modeMtx.Lock()
defer c.modeMtx.Unlock() defer c.modeMtx.Unlock()
if turnOffMeta && !c.mode.NoMetabase() { var workersActive bool
select {
case <-c.workersChan:
default:
workersActive = true
}
stopWorkers := m.NoMetabase() && !c.mode.NoMetabase() || c.mode.ReadWrite() && !m.ReadWrite()
if stopWorkers {
err = c.flush(ctx, true) err = c.flush(ctx, true)
if err != nil { if err != nil {
return err return err
} }
if workersActive {
close(c.workersChan)
c.wg.Wait()
}
} }
if c.db != nil { if c.db != nil {
@ -67,14 +63,14 @@ func (c *cache) setMode(ctx context.Context, m mode.Mode) error {
} }
// Suspend producers to ensure there are channel send operations in fly. // Suspend producers to ensure there are channel send operations in fly.
// flushCh is populated by `flush` with `modeMtx` taken, thus waiting until it is empty // smallFlushCh is populated by `flush` with `modeMtx` taken, thus waiting until it is empty
// guarantees that there are no in-fly operations. // guarantees that there are no in-fly operations.
for len(c.flushCh) != 0 { for len(c.smallFlushCh) != 0 {
c.log.Info(logs.WritecacheWaitingForChannelsToFlush) c.log.Info(logs.WritecacheWaitingForChannelsToFlush)
time.Sleep(time.Second) time.Sleep(time.Second)
} }
if turnOffMeta { if m.NoMetabase() {
c.mode = m c.mode = m
return nil return nil
} }
@ -84,11 +80,21 @@ func (c *cache) setMode(ctx context.Context, m mode.Mode) error {
} }
c.mode = m c.mode = m
if m == mode.ReadWrite {
select {
case <-c.workersChan:
c.workersChan = make(chan struct{})
c.runFlushLoop()
default:
}
}
return nil return nil
} }
// readOnly returns true if current mode is read-only. // readOnly returns true if current mode is read-only.
// `c.modeMtx` must be taken. // `c.modeMtx` must be taken.
func (c *cache) readOnly() bool { func (c *Cache) readOnly() bool {
return c.mode.ReadOnly() return c.mode.ReadOnly()
} }

View file

@ -48,8 +48,6 @@ type options struct {
// maxCacheSize is the maximum total size of all objects saved in cache (DB + FS). // maxCacheSize is the maximum total size of all objects saved in cache (DB + FS).
// 1 GiB by default. // 1 GiB by default.
maxCacheSize uint64 maxCacheSize uint64
// objCounters contains atomic counters for the number of objects stored in cache.
objCounters counters
// maxBatchSize is the maximum batch size for the small object database. // maxBatchSize is the maximum batch size for the small object database.
maxBatchSize int maxBatchSize int
// maxBatchDelay is the maximum batch wait time for the small object database. // maxBatchDelay is the maximum batch wait time for the small object database.

View file

@ -3,6 +3,7 @@ package writecache
import ( import (
"context" "context"
"errors" "errors"
"fmt"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/pkg/tracing" "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/pkg/tracing"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
@ -25,7 +26,7 @@ var (
// Returns ErrNotInitialized if write-cache has not been initialized yet. // Returns ErrNotInitialized if write-cache has not been initialized yet.
// Returns ErrOutOfSpace if saving an object leads to WC's size overflow. // Returns ErrOutOfSpace if saving an object leads to WC's size overflow.
// Returns ErrBigObject if an objects exceeds maximum object size. // Returns ErrBigObject if an objects exceeds maximum object size.
func (c *cache) Put(ctx context.Context, prm common.PutPrm) (common.PutRes, error) { func (c *Cache) Put(ctx context.Context, prm common.PutPrm) (common.PutRes, error) {
ctx, span := tracing.StartSpanFromContext(ctx, "writecache.Put", ctx, span := tracing.StartSpanFromContext(ctx, "writecache.Put",
trace.WithAttributes( trace.WithAttributes(
attribute.String("address", prm.Address.EncodeToString()), attribute.String("address", prm.Address.EncodeToString()),
@ -37,8 +38,6 @@ func (c *cache) Put(ctx context.Context, prm common.PutPrm) (common.PutRes, erro
defer c.modeMtx.RUnlock() defer c.modeMtx.RUnlock()
if c.readOnly() { if c.readOnly() {
return common.PutRes{}, ErrReadOnly return common.PutRes{}, ErrReadOnly
} else if !c.initialized.Load() {
return common.PutRes{}, ErrNotInitialized
} }
sz := uint64(len(prm.RawData)) sz := uint64(len(prm.RawData))
@ -52,23 +51,42 @@ func (c *cache) Put(ctx context.Context, prm common.PutPrm) (common.PutRes, erro
data: prm.RawData, data: prm.RawData,
} }
if sz <= c.smallObjectSize { if c.maxCacheSize < c.sizeIfAdd(sz) {
return common.PutRes{}, c.putSmall(oi) return common.PutRes{}, ErrOutOfSpace
} }
return common.PutRes{}, c.putBig(ctx, oi.addr, prm)
if sz <= c.smallObjectSize {
err := c.putSmall(oi)
if err != nil {
err = fmt.Errorf("could not put small object to DB: %w", err)
}
return common.PutRes{}, err
}
err := c.putBig(ctx, oi.addr, prm)
if err != nil {
err = fmt.Errorf("could not put big object to FSTree: %w", err)
}
return common.PutRes{}, err
} }
// putSmall persists small objects to the write-cache database and // putSmall persists small objects to the write-cache database and
// pushes the to the flush workers queue. // pushes the to the flush workers queue.
func (c *cache) putSmall(obj objectInfo) error { func (c *Cache) putSmall(obj objectInfo) error {
cacheSize := c.estimateCacheSize() var alreadyExists bool
if c.maxCacheSize < c.incSizeDB(cacheSize) {
return ErrOutOfSpace
}
err := c.db.Batch(func(tx *bbolt.Tx) error { err := c.db.Batch(func(tx *bbolt.Tx) error {
b := tx.Bucket(defaultBucket) b := tx.Bucket(defaultBucket)
return b.Put([]byte(obj.addr), obj.data) addr := []byte(obj.addr)
alreadyExists = len(b.Get(addr)) != 0
if alreadyExists {
return nil
}
return b.Put(addr, obj.data)
}) })
if err == nil { if err == nil {
storagelog.Write(c.log, storagelog.Write(c.log,
@ -76,29 +94,22 @@ func (c *cache) putSmall(obj objectInfo) error {
storagelog.StorageTypeField(wcStorageType), storagelog.StorageTypeField(wcStorageType),
storagelog.OpField("db PUT"), storagelog.OpField("db PUT"),
) )
c.objCounters.IncDB()
if !alreadyExists {
c.objCounters.incDB(len(obj.data))
} }
return nil }
return err
} }
// putBig writes object to FSTree and pushes it to the flush workers queue. // putBig writes object to FSTree and pushes it to the flush workers queue.
func (c *cache) putBig(ctx context.Context, addr string, prm common.PutPrm) error { func (c *Cache) putBig(ctx context.Context, addr string, prm common.PutPrm) error {
cacheSz := c.estimateCacheSize()
if c.maxCacheSize < c.incSizeFS(cacheSz) {
return ErrOutOfSpace
}
_, err := c.fsTree.Put(ctx, prm) _, err := c.fsTree.Put(ctx, prm)
if err != nil { if err != nil {
return err return err
} }
if c.blobstor.NeedsCompression(prm.Object) { c.objCounters.incFS(len(prm.RawData))
c.mtx.Lock()
c.compressFlags[addr] = struct{}{}
c.mtx.Unlock()
}
c.objCounters.IncFS()
storagelog.Write(c.log, storagelog.Write(c.log,
storagelog.AddressField(addr), storagelog.AddressField(addr),
storagelog.StorageTypeField(wcStorageType), storagelog.StorageTypeField(wcStorageType),

View file

@ -1,72 +1,134 @@
package writecache package writecache
import ( import (
"context"
"errors"
"fmt" "fmt"
"sync"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"go.etcd.io/bbolt" "go.etcd.io/bbolt"
"go.uber.org/atomic" "go.uber.org/atomic"
) )
func (c *cache) estimateCacheSize() uint64 { func (c *Cache) sizeIfAdd(delta uint64) uint64 {
return c.objCounters.DB()*c.smallObjectSize + c.objCounters.FS()*c.maxObjectSize return delta + c.objCounters.fstreeSize.Load() + c.objCounters.dbSize.Load()
}
func (c *cache) incSizeDB(sz uint64) uint64 {
return sz + c.smallObjectSize
}
func (c *cache) incSizeFS(sz uint64) uint64 {
return sz + c.maxObjectSize
} }
type counters struct { type counters struct {
cDB, cFS atomic.Uint64 dbSize, fstreeSize atomic.Uint64
} }
func (x *counters) IncDB() { func (x *counters) incDB(delta int) {
x.cDB.Inc() x.dbSize.Add(uint64(delta))
} }
func (x *counters) DecDB() { func (x *counters) decDB(delta int) {
x.cDB.Dec() x.dbSize.Sub(uint64(delta))
} }
func (x *counters) DB() uint64 { func (x *counters) incFS(delta int) {
return x.cDB.Load() x.fstreeSize.Add(uint64(delta))
} }
func (x *counters) IncFS() { func (x *counters) decFS(delta int) {
x.cFS.Inc() x.fstreeSize.Sub(uint64(delta))
} }
func (x *counters) DecFS() { func (c *Cache) initCounters(ctx context.Context) error {
x.cFS.Dec() var wg sync.WaitGroup
var dbErr error
var fsErr error
wg.Add(1)
go func() {
dbErr = c.initDBSizeCounter(ctx)
wg.Done()
}()
wg.Add(1)
go func() {
fsErr = c.initFSSizeCounter(ctx)
wg.Done()
}()
wg.Wait()
switch {
case dbErr != nil:
return fmt.Errorf("database counter initialization: %w", dbErr)
case fsErr != nil:
return fmt.Errorf("FSTree counter initialization: %w", fsErr)
default:
return nil
}
} }
func (x *counters) FS() uint64 { var stopIter = errors.New("stop")
return x.cFS.Load()
}
func (c *cache) initCounters() error { func (c *Cache) initDBSizeCounter(ctx context.Context) error {
var inDB uint64 var inDB int
err := c.db.View(func(tx *bbolt.Tx) error { err := c.db.View(func(tx *bbolt.Tx) error {
b := tx.Bucket(defaultBucket) b := tx.Bucket(defaultBucket)
if b != nil { if b == nil {
inDB = uint64(b.Stats().KeyN) return nil
} }
return b.ForEach(func(_, v []byte) error {
select {
case <-ctx.Done():
return ctx.Err()
case <-c.workersChan:
return stopIter
default:
}
inDB += len(v)
return nil return nil
}) })
if err != nil { })
if err != nil && !errors.Is(err, stopIter) {
return fmt.Errorf("could not read write-cache DB counter: %w", err) return fmt.Errorf("could not read write-cache DB counter: %w", err)
} }
inFS, err := c.fsTree.NumberOfObjects() c.objCounters.dbSize.Store(uint64(inDB))
if err != nil {
return fmt.Errorf("could not read write-cache FS counter: %w", err) return nil
} }
c.objCounters.cDB.Store(inDB) func (c *Cache) initFSSizeCounter(ctx context.Context) error {
c.objCounters.cFS.Store(inFS) var inFSTree int
var prm common.IteratePrm
prm.LazyHandler = func(address oid.Address, f func() ([]byte, error)) error {
select {
case <-ctx.Done():
return ctx.Err()
case <-c.workersChan:
return stopIter
default:
}
data, err := f()
if err != nil {
return err
}
// write-cache is a temporary storage on a fast disk,
// so it is not expected to be configured with any
// compressor ever
inFSTree += len(data)
return nil
}
_, err := c.fsTree.Iterate(prm)
if err != nil && !errors.Is(err, stopIter) {
return fmt.Errorf("could not read write-cache FSTree counter: %w", err)
}
c.objCounters.fstreeSize.Store(uint64(inFSTree))
return nil return nil
} }

View file

@ -1,45 +1,23 @@
package writecache package writecache
import ( import (
"context"
"errors"
"fmt" "fmt"
"os" "os"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/fstree" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/fstree"
storagelog "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/log"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
lru "github.com/hashicorp/golang-lru/v2"
"github.com/hashicorp/golang-lru/v2/simplelru"
"go.etcd.io/bbolt" "go.etcd.io/bbolt"
"go.uber.org/zap"
) )
// store represents persistent storage with in-memory LRU cache // smallStore represents persistent storage with in-memory LRU cache
// for flushed items on top of it. // for flushed items on top of it.
type store struct { type smallStore struct {
maxFlushedMarksCount int
maxRemoveBatchSize int
// flushed contains addresses of objects that were already flushed to the main storage.
// We use LRU cache instead of map here to facilitate removing of unused object in favour of
// frequently read ones.
// MUST NOT be used inside bolt db transaction because it's eviction handler
// removes untracked items from the database.
flushed simplelru.LRUCache[string, bool]
db *bbolt.DB db *bbolt.DB
dbKeysToRemove []string
fsKeysToRemove []string
} }
const dbName = "small.bolt" const dbName = "small.bolt"
func (c *cache) openStore(readOnly bool) error { func (c *Cache) openStore(readOnly bool) error {
err := util.MkdirAllX(c.path, os.ModePerm) err := util.MkdirAllX(c.path, os.ModePerm)
if err != nil { if err != nil {
return err return err
@ -69,101 +47,9 @@ func (c *cache) openStore(readOnly bool) error {
fstree.WithDepth(1), fstree.WithDepth(1),
fstree.WithDirNameLen(1), fstree.WithDirNameLen(1),
fstree.WithNoSync(c.noSync)) fstree.WithNoSync(c.noSync))
if err := c.fsTree.Open(readOnly); err != nil { if err = c.fsTree.Open(readOnly); err != nil {
return fmt.Errorf("could not open FSTree: %w", err) return fmt.Errorf("could not open FSTree: %w", err)
} }
// Write-cache can be opened multiple times during `SetMode`.
// flushed map must not be re-created in this case.
if c.flushed == nil {
c.flushed, _ = lru.NewWithEvict[string, bool](c.maxFlushedMarksCount, c.removeFlushed)
}
c.initialized.Store(false)
return nil return nil
} }
// removeFlushed removes an object from the writecache.
// To minimize interference with the client operations, the actual removal
// is done in batches.
// It is not thread-safe and is used only as an evict callback to LRU cache.
func (c *cache) removeFlushed(key string, value bool) {
fromDatabase := value
if fromDatabase {
c.dbKeysToRemove = append(c.dbKeysToRemove, key)
} else {
c.fsKeysToRemove = append(c.fsKeysToRemove, key)
}
if len(c.dbKeysToRemove)+len(c.fsKeysToRemove) >= c.maxRemoveBatchSize {
c.dbKeysToRemove = c.deleteFromDB(c.dbKeysToRemove)
c.fsKeysToRemove = c.deleteFromDisk(c.fsKeysToRemove)
}
}
func (c *cache) deleteFromDB(keys []string) []string {
if len(keys) == 0 {
return keys
}
var errorIndex int
err := c.db.Batch(func(tx *bbolt.Tx) error {
b := tx.Bucket(defaultBucket)
for errorIndex = range keys {
if err := b.Delete([]byte(keys[errorIndex])); err != nil {
return err
}
}
return nil
})
for i := 0; i < errorIndex; i++ {
c.objCounters.DecDB()
storagelog.Write(c.log,
storagelog.AddressField(keys[i]),
storagelog.StorageTypeField(wcStorageType),
storagelog.OpField("db DELETE"),
)
}
if err != nil {
c.log.Error(logs.WritecacheCantRemoveObjectsFromTheDatabase, zap.Error(err))
}
copy(keys, keys[errorIndex:])
return keys[:len(keys)-errorIndex]
}
func (c *cache) deleteFromDisk(keys []string) []string {
if len(keys) == 0 {
return keys
}
var copyIndex int
var addr oid.Address
for i := range keys {
if err := addr.DecodeString(keys[i]); err != nil {
c.log.Error(logs.WritecacheCantParseAddress, zap.String("address", keys[i]))
continue
}
_, err := c.fsTree.Delete(context.TODO(), common.DeletePrm{Address: addr})
if err != nil && !errors.As(err, new(apistatus.ObjectNotFound)) {
c.log.Error(logs.WritecacheCantRemoveObjectFromWritecache, zap.Error(err))
// Save the key for the next iteration.
keys[copyIndex] = keys[i]
copyIndex++
continue
} else if err == nil {
storagelog.Write(c.log,
storagelog.AddressField(keys[i]),
storagelog.StorageTypeField(wcStorageType),
storagelog.OpField("fstree DELETE"),
)
c.objCounters.DecFS()
}
}
return keys[:copyIndex]
}

View file

@ -5,15 +5,11 @@ import (
"os" "os"
"sync" "sync"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/pkg/tracing"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/fstree" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/fstree"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"go.etcd.io/bbolt" "go.etcd.io/bbolt"
"go.uber.org/atomic"
"go.uber.org/zap" "go.uber.org/zap"
) )
@ -24,52 +20,24 @@ type Info struct {
} }
// Cache represents write-cache for objects. // Cache represents write-cache for objects.
type Cache interface { type Cache struct {
Get(ctx context.Context, address oid.Address) (*object.Object, error)
Head(context.Context, oid.Address) (*object.Object, error)
// Delete removes object referenced by the given oid.Address from the
// Cache. Returns any error encountered that prevented the object to be
// removed.
//
// Returns apistatus.ObjectNotFound if object is missing in the Cache.
// Returns ErrReadOnly if the Cache is currently in the read-only mode.
Delete(context.Context, oid.Address) error
Iterate(IterationPrm) error
Put(context.Context, common.PutPrm) (common.PutRes, error)
SetMode(mode.Mode) error
SetLogger(*logger.Logger)
DumpInfo() Info
Flush(context.Context, bool) error
Init() error
Open(readOnly bool) error
Close() error
}
type cache struct {
options options
// mtx protects statistics, counters and compressFlags. // objCounters contains atomic counters for the number of objects stored in cache.
mtx sync.RWMutex objCounters counters
mode mode.Mode
initialized atomic.Bool
stopInitCh chan struct{} // used to sync initWG initialisation routines and _only_ them
initWG sync.WaitGroup // for initialisation routines only
modeMtx sync.RWMutex modeMtx sync.RWMutex
mode mode.Mode
// compressFlags maps address of a big object to boolean value indicating
// whether object should be compressed.
compressFlags map[string]struct{}
// flushCh is a channel with objects to flush. // flushCh is a channel with objects to flush.
flushCh chan *object.Object smallFlushCh chan objWithData
// closeCh is close channel, protected by modeMtx. // workersChan is close channel, protected by modeMtx.
closeCh chan struct{} // It indicates status of the background workers.
workersChan chan struct{}
// wg is a wait group for flush workers. // wg is a wait group for flush workers.
wg sync.WaitGroup wg sync.WaitGroup
// store contains underlying database. // store contains underlying database.
store smallStore
// fsTree contains big files stored directly on file-system. // fsTree contains big files stored directly on file-system.
fsTree *fstree.FSTree fsTree *fstree.FSTree
} }
@ -94,13 +62,16 @@ var (
) )
// New creates new writecache instance. // New creates new writecache instance.
func New(opts ...Option) Cache { // The value must not be copied after creation.
c := &cache{ func New(opts ...Option) *Cache {
flushCh: make(chan *object.Object), closeCh := make(chan struct{})
mode: mode.ReadWrite, close(closeCh)
stopInitCh: make(chan struct{}),
c := &Cache{
smallFlushCh: make(chan objWithData),
mode: mode.ReadWrite,
workersChan: closeCh,
compressFlags: make(map[string]struct{}),
options: options{ options: options{
log: &logger.Logger{Logger: zap.NewNop()}, log: &logger.Logger{Logger: zap.NewNop()},
maxObjectSize: defaultMaxObjectSize, maxObjectSize: defaultMaxObjectSize,
@ -117,66 +88,51 @@ func New(opts ...Option) Cache {
opts[i](&c.options) opts[i](&c.options)
} }
// Make the LRU cache contain which take approximately 3/4 of the maximum space.
// Assume small and big objects are stored in 50-50 proportion.
c.maxFlushedMarksCount = int(c.maxCacheSize/c.maxObjectSize+c.maxCacheSize/c.smallObjectSize) / 2 * 3 / 4
// Trigger the removal when the cache is 7/8 full, so that new items can still arrive.
c.maxRemoveBatchSize = c.maxFlushedMarksCount / 8
return c return c
} }
// SetLogger sets logger. It is used after the shard ID was generated to use it in logs. // SetLogger sets logger. It is used after the shard ID was generated to use it in logs.
func (c *cache) SetLogger(l *logger.Logger) { func (c *Cache) SetLogger(l *logger.Logger) {
c.log = l c.log = l
} }
func (c *cache) DumpInfo() Info { func (c *Cache) DumpInfo() Info {
return Info{ return Info{
Path: c.path, Path: c.path,
} }
} }
// Open opens and initializes database. Reads object counters from the ObjectCounters instance. // Open opens and initializes database. Reads object counters from the ObjectCounters instance.
func (c *cache) Open(readOnly bool) error { func (c *Cache) Open(readOnly bool) error {
err := c.openStore(readOnly) err := c.openStore(readOnly)
if err != nil { if err != nil {
return err return err
} }
// Opening after Close is done during maintenance mode, c.modeMtx.Lock()
// thus we need to create a channel here. defer c.modeMtx.Unlock()
c.closeCh = make(chan struct{})
return c.initCounters() if readOnly {
} c.mode = mode.ReadOnly
} else {
c.mode = mode.ReadWrite
}
// Init runs necessary services.
func (c *cache) Init() error {
ctx, span := tracing.StartSpanFromContext(context.TODO(), "writecache.Init")
defer span.End()
c.initFlushMarks(ctx)
c.runFlushLoop()
return nil return nil
} }
// Close closes db connection and stops services. Executes ObjectCounters.FlushAndClose op. // Close closes db connection and stops services. Executes ObjectCounters.FlushAndClose op.
func (c *cache) Close() error { func (c *Cache) Close() error {
// Finish all in-progress operations. // Finish all in-progress operations if they are
if err := c.setMode(context.TODO(), mode.ReadOnly); err != nil { // in progress.
select {
case <-c.workersChan:
default:
err := c.setMode(context.TODO(), mode.ReadOnly)
if err != nil {
return err return err
} }
if c.closeCh != nil {
close(c.closeCh)
} }
c.wg.Wait()
if c.closeCh != nil {
c.closeCh = nil
}
c.initialized.Store(false)
var err error var err error
if c.db != nil { if c.db != nil {