From 8e4cd78cc8b5abfe940fb56cff28e7d1d80b2c02 Mon Sep 17 00:00:00 2001 From: Dmitrii Stepanov Date: Wed, 24 Jul 2024 14:10:22 +0300 Subject: [PATCH] [#9999] writecache: Flush writecache concurrently Signed-off-by: Dmitrii Stepanov --- cmd/frostfs-lens/internal/meta/inspect.go | 6 +- internal/logs/logs.go | 1 + .../metabase/storage_id.go | 38 ++----- .../metabase/storage_id_test.go | 11 +- pkg/local_object_storage/shard/delete.go | 6 +- .../shard/gc_internal_test.go | 7 +- pkg/local_object_storage/shard/get.go | 5 +- pkg/local_object_storage/shard/rebuilder.go | 4 +- .../writecache/benchmark/writecache_test.go | 26 ++++- pkg/local_object_storage/writecache/cache.go | 10 +- pkg/local_object_storage/writecache/flush.go | 104 ++++++++++++++---- .../writecache/flush_test.go | 6 +- .../writecache/writecache.go | 1 + 13 files changed, 138 insertions(+), 87 deletions(-) diff --git a/cmd/frostfs-lens/internal/meta/inspect.go b/cmd/frostfs-lens/internal/meta/inspect.go index 9eb60f966..205c71dc6 100644 --- a/cmd/frostfs-lens/internal/meta/inspect.go +++ b/cmd/frostfs-lens/internal/meta/inspect.go @@ -33,13 +33,11 @@ func inspectFunc(cmd *cobra.Command, _ []string) { db := openMeta(cmd) defer db.Close() - storageID := meta.StorageIDPrm{} - storageID.SetAddress(addr) - + storageID := meta.StorageIDPrm{Address: addr} resStorageID, err := db.StorageID(cmd.Context(), storageID) common.ExitOnErr(cmd, common.Errf("could not check if the obj is small: %w", err)) - if id := resStorageID.StorageID(); id != nil { + if id := resStorageID.StorageID; id != nil { cmd.Printf("Object storageID: %s\n\n", blobovniczatree.NewIDFromBytes(id).Path()) } else { cmd.Printf("Object does not contain storageID\n\n") diff --git a/internal/logs/logs.go b/internal/logs/logs.go index 1a0bda00d..f73866861 100644 --- a/internal/logs/logs.go +++ b/internal/logs/logs.go @@ -468,6 +468,7 @@ const ( FSTreeCantUnmarshalObject = "can't unmarshal an object" FSTreeCantFushObjectBlobstor = "can't flush an object to blobstor" FSTreeCantUpdateID = "can't update object storage ID" + FSTreeCantGetID = "can't get object storage ID" FSTreeCantDecodeDBObjectAddress = "can't decode object address from the DB" PutSingleRedirectFailure = "failed to redirect PutSingle request" StorageIDRetrievalFailure = "can't get storage ID from metabase" diff --git a/pkg/local_object_storage/metabase/storage_id.go b/pkg/local_object_storage/metabase/storage_id.go index 6d620b41a..657334d17 100644 --- a/pkg/local_object_storage/metabase/storage_id.go +++ b/pkg/local_object_storage/metabase/storage_id.go @@ -15,22 +15,12 @@ import ( // StorageIDPrm groups the parameters of StorageID operation. type StorageIDPrm struct { - addr oid.Address + Address oid.Address } // StorageIDRes groups the resulting values of StorageID operation. type StorageIDRes struct { - id []byte -} - -// SetAddress is a StorageID option to set the object address to check. -func (p *StorageIDPrm) SetAddress(addr oid.Address) { - p.addr = addr -} - -// StorageID returns storage ID. -func (r StorageIDRes) StorageID() []byte { - return r.id + StorageID []byte } // StorageID returns storage descriptor for objects from the blobstor. @@ -46,7 +36,7 @@ func (db *DB) StorageID(ctx context.Context, prm StorageIDPrm) (res StorageIDRes _, span := tracing.StartSpanFromContext(ctx, "metabase.StorageID", trace.WithAttributes( - attribute.String("address", prm.addr.EncodeToString()), + attribute.String("address", prm.Address.EncodeToString()), )) defer span.End() @@ -58,7 +48,7 @@ func (db *DB) StorageID(ctx context.Context, prm StorageIDPrm) (res StorageIDRes } err = db.boltDB.View(func(tx *bbolt.Tx) error { - res.id, err = db.storageID(tx, prm.addr) + res.StorageID, err = db.storageID(tx, prm.Address) return err }) @@ -83,23 +73,13 @@ func (db *DB) storageID(tx *bbolt.Tx, addr oid.Address) ([]byte, error) { // UpdateStorageIDPrm groups the parameters of UpdateStorageID operation. type UpdateStorageIDPrm struct { - addr oid.Address - id []byte + Address oid.Address + StorageID []byte } // UpdateStorageIDRes groups the resulting values of UpdateStorageID operation. type UpdateStorageIDRes struct{} -// SetAddress is an UpdateStorageID option to set the object address to check. -func (p *UpdateStorageIDPrm) SetAddress(addr oid.Address) { - p.addr = addr -} - -// SetStorageID is an UpdateStorageID option to set the storage ID. -func (p *UpdateStorageIDPrm) SetStorageID(id []byte) { - p.id = id -} - // UpdateStorageID updates storage descriptor for objects from the blobstor. func (db *DB) UpdateStorageID(ctx context.Context, prm UpdateStorageIDPrm) (res UpdateStorageIDRes, err error) { var ( @@ -112,8 +92,8 @@ func (db *DB) UpdateStorageID(ctx context.Context, prm UpdateStorageIDPrm) (res _, span := tracing.StartSpanFromContext(ctx, "metabase.UpdateStorageID", trace.WithAttributes( - attribute.String("address", prm.addr.EncodeToString()), - attribute.String("storage_id", string(prm.id)), + attribute.String("address", prm.Address.EncodeToString()), + attribute.String("storage_id", string(prm.StorageID)), )) defer span.End() @@ -127,7 +107,7 @@ func (db *DB) UpdateStorageID(ctx context.Context, prm UpdateStorageIDPrm) (res } err = db.boltDB.Batch(func(tx *bbolt.Tx) error { - return setStorageID(tx, prm.addr, prm.id, true) + return setStorageID(tx, prm.Address, prm.StorageID, true) }) success = err == nil return res, metaerr.Wrap(err) diff --git a/pkg/local_object_storage/metabase/storage_id_test.go b/pkg/local_object_storage/metabase/storage_id_test.go index aaf6480ab..d6e4a2290 100644 --- a/pkg/local_object_storage/metabase/storage_id_test.go +++ b/pkg/local_object_storage/metabase/storage_id_test.go @@ -102,18 +102,13 @@ func TestPutWritecacheDataRace(t *testing.T) { } func metaUpdateStorageID(db *meta.DB, addr oid.Address, id []byte) error { - var sidPrm meta.UpdateStorageIDPrm - sidPrm.SetAddress(addr) - sidPrm.SetStorageID(id) - + sidPrm := meta.UpdateStorageIDPrm{Address: addr, StorageID: id} _, err := db.UpdateStorageID(context.Background(), sidPrm) return err } func metaStorageID(db *meta.DB, addr oid.Address) ([]byte, error) { - var sidPrm meta.StorageIDPrm - sidPrm.SetAddress(addr) - + sidPrm := meta.StorageIDPrm{Address: addr} r, err := db.StorageID(context.Background(), sidPrm) - return r.StorageID(), err + return r.StorageID, err } diff --git a/pkg/local_object_storage/shard/delete.go b/pkg/local_object_storage/shard/delete.go index c898fdf41..4b57f82b6 100644 --- a/pkg/local_object_storage/shard/delete.go +++ b/pkg/local_object_storage/shard/delete.go @@ -105,9 +105,7 @@ func (s *Shard) validateWritecacheDoesntContainObject(ctx context.Context, addr } func (s *Shard) deleteFromBlobstor(ctx context.Context, addr oid.Address) error { - var sPrm meta.StorageIDPrm - sPrm.SetAddress(addr) - + sPrm := meta.StorageIDPrm{Address: addr} res, err := s.metaBase.StorageID(ctx, sPrm) if err != nil { s.log.Debug(logs.StorageIDRetrievalFailure, @@ -116,7 +114,7 @@ func (s *Shard) deleteFromBlobstor(ctx context.Context, addr oid.Address) error zap.String("trace_id", tracingPkg.GetTraceID(ctx))) return err } - storageID := res.StorageID() + storageID := res.StorageID if storageID == nil { // if storageID is nil it means: // 1. there is no such object diff --git a/pkg/local_object_storage/shard/gc_internal_test.go b/pkg/local_object_storage/shard/gc_internal_test.go index 3993593ad..5e4b9b5a2 100644 --- a/pkg/local_object_storage/shard/gc_internal_test.go +++ b/pkg/local_object_storage/shard/gc_internal_test.go @@ -109,15 +109,14 @@ func Test_ObjectNotFoundIfNotDeletedFromMetabase(t *testing.T) { require.True(t, client.IsErrObjectNotFound(err), "invalid error type") // storageID - var metaStIDPrm meta.StorageIDPrm - metaStIDPrm.SetAddress(addr) + metaStIDPrm := meta.StorageIDPrm{Address: addr} storageID, err := sh.metaBase.StorageID(context.Background(), metaStIDPrm) require.NoError(t, err, "failed to get storage ID") // check existence in blobstore var bsExisted common.ExistsPrm bsExisted.Address = addr - bsExisted.StorageID = storageID.StorageID() + bsExisted.StorageID = storageID.StorageID exRes, err := sh.blobStor.Exists(context.Background(), bsExisted) require.NoError(t, err, "failed to check blobstore existence") require.True(t, exRes.Exists, "invalid blobstore existence result") @@ -125,7 +124,7 @@ func Test_ObjectNotFoundIfNotDeletedFromMetabase(t *testing.T) { // drop from blobstor var bsDeletePrm common.DeletePrm bsDeletePrm.Address = addr - bsDeletePrm.StorageID = storageID.StorageID() + bsDeletePrm.StorageID = storageID.StorageID _, err = sh.blobStor.Delete(context.Background(), bsDeletePrm) require.NoError(t, err, "failed to delete from blobstore") diff --git a/pkg/local_object_storage/shard/get.go b/pkg/local_object_storage/shard/get.go index 2e7c84bcd..8b43b4fc3 100644 --- a/pkg/local_object_storage/shard/get.go +++ b/pkg/local_object_storage/shard/get.go @@ -160,15 +160,14 @@ func (s *Shard) fetchObjectData(ctx context.Context, addr oid.Address, skipMeta return res, false, err } - var mPrm meta.StorageIDPrm - mPrm.SetAddress(addr) + mPrm := meta.StorageIDPrm{Address: addr} mExRes, err := s.metaBase.StorageID(ctx, mPrm) if err != nil { return nil, true, fmt.Errorf("can't fetch blobovnicza id from metabase: %w", err) } - storageID := mExRes.StorageID() + storageID := mExRes.StorageID if storageID == nil { // `nil` storageID returned without any error // means that object is big, `cb` expects an diff --git a/pkg/local_object_storage/shard/rebuilder.go b/pkg/local_object_storage/shard/rebuilder.go index f18573c57..06a254319 100644 --- a/pkg/local_object_storage/shard/rebuilder.go +++ b/pkg/local_object_storage/shard/rebuilder.go @@ -90,9 +90,7 @@ func (u *mbStorageIDUpdate) UpdateStorageID(ctx context.Context, addr oid.Addres return errMBIsNotAvailable } - var prm meta.UpdateStorageIDPrm - prm.SetAddress(addr) - prm.SetStorageID(storageID) + prm := meta.UpdateStorageIDPrm{Address: addr, StorageID: storageID} _, err := u.mb.UpdateStorageID(ctx, prm) return err } diff --git a/pkg/local_object_storage/writecache/benchmark/writecache_test.go b/pkg/local_object_storage/writecache/benchmark/writecache_test.go index 8d3ba4b74..495f66b0d 100644 --- a/pkg/local_object_storage/writecache/benchmark/writecache_test.go +++ b/pkg/local_object_storage/writecache/benchmark/writecache_test.go @@ -2,6 +2,7 @@ package benchmark import ( "context" + "sync" "testing" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common" @@ -10,6 +11,7 @@ import ( meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache" + oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" "github.com/stretchr/testify/require" ) @@ -80,12 +82,30 @@ func benchmarkPutPrepare(b *testing.B, cache writecache.Cache) { require.NoError(b, cache.Init(), "initializing") } -type testMetabase struct{} +type testMetabase struct { + storageIDs map[oid.Address][]byte + guard *sync.RWMutex +} -func (testMetabase) UpdateStorageID(context.Context, meta.UpdateStorageIDPrm) (meta.UpdateStorageIDRes, error) { +func (t *testMetabase) UpdateStorageID(_ context.Context, prm meta.UpdateStorageIDPrm) (meta.UpdateStorageIDRes, error) { + t.guard.Lock() + defer t.guard.Unlock() + t.storageIDs[prm.Address] = prm.StorageID return meta.UpdateStorageIDRes{}, nil } +func (t *testMetabase) StorageID(_ context.Context, prm meta.StorageIDPrm) (meta.StorageIDRes, error) { + t.guard.RLock() + defer t.guard.RUnlock() + + if id, found := t.storageIDs[prm.Address]; found { + return meta.StorageIDRes{ + StorageID: id, + }, nil + } + return meta.StorageIDRes{}, nil +} + func newCache(b *testing.B) writecache.Cache { bs := teststore.New( teststore.WithPut(func(pp common.PutPrm) (common.PutRes, error) { return common.PutRes{}, nil }), @@ -93,7 +113,7 @@ func newCache(b *testing.B) writecache.Cache { return writecache.New( writecache.WithPath(b.TempDir()), writecache.WithBlobstor(bs), - writecache.WithMetabase(testMetabase{}), + writecache.WithMetabase(&testMetabase{storageIDs: make(map[oid.Address][]byte), guard: &sync.RWMutex{}}), writecache.WithMaxCacheSize(256<<30), ) } diff --git a/pkg/local_object_storage/writecache/cache.go b/pkg/local_object_storage/writecache/cache.go index e57f4fc5e..a6d055dba 100644 --- a/pkg/local_object_storage/writecache/cache.go +++ b/pkg/local_object_storage/writecache/cache.go @@ -10,6 +10,8 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger" + utilSync "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/sync" + oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" "go.uber.org/zap" ) @@ -26,6 +28,9 @@ type cache struct { // whether object should be compressed. compressFlags map[string]struct{} + flushCh chan objectInfo + flushingGuard *utilSync.KeyLocker[oid.Address] + // cancel is cancel function, protected by modeMtx in Close. cancel atomic.Value // wg is a wait group for flush workers. @@ -47,8 +52,9 @@ var dummyCanceler context.CancelFunc = func() {} // New creates new writecache instance. func New(opts ...Option) Cache { c := &cache{ - mode: mode.Disabled, - + mode: mode.Disabled, + flushCh: make(chan objectInfo), + flushingGuard: utilSync.NewKeyLocker[oid.Address](), compressFlags: make(map[string]struct{}), options: options{ log: &logger.Logger{Logger: zap.NewNop()}, diff --git a/pkg/local_object_storage/writecache/flush.go b/pkg/local_object_storage/writecache/flush.go index d595ddc6c..105d7a189 100644 --- a/pkg/local_object_storage/writecache/flush.go +++ b/pkg/local_object_storage/writecache/flush.go @@ -6,7 +6,6 @@ import ( "time" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" - objectCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr" @@ -14,6 +13,7 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode" "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" + oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" "go.uber.org/zap" @@ -32,33 +32,78 @@ func (c *cache) runFlushLoop(ctx context.Context) { return } + for i := 0; i < c.workersCount; i++ { + c.wg.Add(1) + go c.workerFlush(ctx) + } + c.wg.Add(1) - go func() { - c.workerFlushBig(ctx) - c.wg.Done() - }() + go c.workerSelect(ctx) } -func (c *cache) workerFlushBig(ctx context.Context) { +func (c *cache) workerSelect(ctx context.Context) { + defer c.wg.Done() tick := time.NewTicker(defaultFlushInterval) for { select { case <-tick.C: - c.modeMtx.RLock() - if c.readOnly() || c.noMetabase() { - c.modeMtx.RUnlock() - break + var prm common.IteratePrm + prm.IgnoreErrors = true + prm.Handler = func(ie common.IterationElement) error { + c.modeMtx.RLock() + defer c.modeMtx.RUnlock() + if c.readOnly() { + return ErrReadOnly + } + if c.noMetabase() { + return ErrDegraded + } + + select { + case <-ctx.Done(): + return ctx.Err() + case c.flushCh <- objectInfo{ + data: ie.ObjectData, + address: ie.Address, + }: + return nil + } } - - _ = c.flushFSTree(ctx, true) - - c.modeMtx.RUnlock() + _, _ = c.fsTree.Iterate(ctx, prm) case <-ctx.Done(): return } } } +func (c *cache) workerFlush(ctx context.Context) { + defer c.wg.Done() + + var objInfo objectInfo + for { + select { + case objInfo = <-c.flushCh: + case <-ctx.Done(): + return + } + + var obj objectSDK.Object + err := obj.Unmarshal(objInfo.data) + if err != nil { + c.reportFlushError(logs.FSTreeCantUnmarshalObject, objInfo.address.EncodeToString(), metaerr.Wrap(err)) + continue + } + + err = c.flushObject(ctx, objInfo.address, &obj, objInfo.data) + if err != nil { + // Error is handled in flushObject. + continue + } + + c.deleteFromDisk(ctx, objInfo.address) + } +} + func (c *cache) reportFlushError(msg string, addr string, err error) { if c.reportError != nil { c.reportError(msg, err) @@ -85,7 +130,7 @@ func (c *cache) flushFSTree(ctx context.Context, ignoreErrors bool) error { return err } - err = c.flushObject(ctx, &obj, e.ObjectData, StorageTypeFSTree) + err = c.flushObject(ctx, e.Address, &obj, e.ObjectData) if err != nil { if ignoreErrors { return nil @@ -102,15 +147,25 @@ func (c *cache) flushFSTree(ctx context.Context, ignoreErrors bool) error { } // flushObject is used to write object directly to the main storage. -func (c *cache) flushObject(ctx context.Context, obj *objectSDK.Object, data []byte, st StorageType) error { - var err error +func (c *cache) flushObject(ctx context.Context, addr oid.Address, obj *objectSDK.Object, data []byte) error { + c.flushingGuard.Lock(addr) + defer c.flushingGuard.Unlock(addr) + + stPrm := meta.StorageIDPrm{Address: addr} + stRes, err := c.metabase.StorageID(ctx, stPrm) + if err != nil { + c.reportFlushError(logs.FSTreeCantGetID, addr.EncodeToString(), err) + return err + } + if stRes.StorageID != nil { + // already flushed + return nil + } defer func() { - c.metrics.Flush(err == nil, st) + c.metrics.Flush(err == nil, StorageTypeFSTree) }() - addr := objectCore.AddressOf(obj) - var prm common.PutPrm prm.Object = obj prm.RawData = data @@ -125,9 +180,7 @@ func (c *cache) flushObject(ctx context.Context, obj *objectSDK.Object, data []b return err } - var updPrm meta.UpdateStorageIDPrm - updPrm.SetAddress(addr) - updPrm.SetStorageID(res.StorageID) + updPrm := meta.UpdateStorageIDPrm{Address: addr, StorageID: res.StorageID} _, err = c.metabase.UpdateStorageID(ctx, updPrm) if err != nil { @@ -169,3 +222,8 @@ func (c *cache) Flush(ctx context.Context, ignoreErrors, seal bool) error { func (c *cache) flush(ctx context.Context, ignoreErrors bool) error { return c.flushFSTree(ctx, ignoreErrors) } + +type objectInfo struct { + data []byte + address oid.Address +} diff --git a/pkg/local_object_storage/writecache/flush_test.go b/pkg/local_object_storage/writecache/flush_test.go index 861be6fd2..3038f6470 100644 --- a/pkg/local_object_storage/writecache/flush_test.go +++ b/pkg/local_object_storage/writecache/flush_test.go @@ -226,15 +226,13 @@ func putObjects(t *testing.T, c Cache) []objectPair { func check(t *testing.T, mb *meta.DB, bs *blobstor.BlobStor, objects []objectPair) { for i := range objects { - var mPrm meta.StorageIDPrm - mPrm.SetAddress(objects[i].addr) - + mPrm := meta.StorageIDPrm{Address: objects[i].addr} mRes, err := mb.StorageID(context.Background(), mPrm) require.NoError(t, err) var prm common.GetPrm prm.Address = objects[i].addr - prm.StorageID = mRes.StorageID() + prm.StorageID = mRes.StorageID res, err := bs.Get(context.Background(), prm) require.NoError(t, err) diff --git a/pkg/local_object_storage/writecache/writecache.go b/pkg/local_object_storage/writecache/writecache.go index 76ea84eda..960137dfb 100644 --- a/pkg/local_object_storage/writecache/writecache.go +++ b/pkg/local_object_storage/writecache/writecache.go @@ -55,6 +55,7 @@ type MainStorage interface { // Metabase is the interface of the metabase used by Cache implementations. type Metabase interface { UpdateStorageID(context.Context, meta.UpdateStorageIDPrm) (meta.UpdateStorageIDRes, error) + StorageID(context.Context, meta.StorageIDPrm) (meta.StorageIDRes, error) } var (