From f526f49995575541626135e7d4542c0e014b63f2 Mon Sep 17 00:00:00 2001 From: Dmitrii Stepanov Date: Wed, 20 Dec 2023 17:18:28 +0300 Subject: [PATCH] [#874] engine: Check object existance concurrently Signed-off-by: Dmitrii Stepanov --- .../blobovnicza/exists.go | 12 ++++ pkg/local_object_storage/blobovnicza/get.go | 12 ++++ .../blobstor/blobovniczatree/get.go | 5 ++ .../blobstor/blobovniczatree/iterate.go | 18 +++++ pkg/local_object_storage/blobstor/exists.go | 14 ++-- .../blobstor/fstree/fstree.go | 6 ++ .../engine/engine_test.go | 6 ++ pkg/local_object_storage/engine/exists.go | 72 +++++++++++-------- pkg/local_object_storage/engine/put.go | 2 - pkg/local_object_storage/metabase/exists.go | 6 ++ pkg/local_object_storage/shard/exists.go | 6 ++ 11 files changed, 124 insertions(+), 35 deletions(-) diff --git a/pkg/local_object_storage/blobovnicza/exists.go b/pkg/local_object_storage/blobovnicza/exists.go index f7bc84d4a..b5d723ee6 100644 --- a/pkg/local_object_storage/blobovnicza/exists.go +++ b/pkg/local_object_storage/blobovnicza/exists.go @@ -21,10 +21,22 @@ func (b *Blobovnicza) Exists(ctx context.Context, addr oid.Address) (bool, error )) defer span.End() + select { + case <-ctx.Done(): + return false, ctx.Err() + default: + } + addrKey := addressKey(addr) err := b.boltDB.View(func(tx *bbolt.Tx) error { return tx.ForEach(func(bucketName []byte, buck *bbolt.Bucket) error { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + if isNonDataBucket(bucketName) { return nil } diff --git a/pkg/local_object_storage/blobovnicza/get.go b/pkg/local_object_storage/blobovnicza/get.go index 600323f55..36cf69d54 100644 --- a/pkg/local_object_storage/blobovnicza/get.go +++ b/pkg/local_object_storage/blobovnicza/get.go @@ -51,6 +51,12 @@ func (b *Blobovnicza) Get(ctx context.Context, prm GetPrm) (GetRes, error) { )) defer span.End() + select { + case <-ctx.Done(): + return GetRes{}, ctx.Err() + default: + } + var ( data []byte addrKey = addressKey(prm.addr) @@ -58,6 +64,12 @@ func (b *Blobovnicza) Get(ctx context.Context, prm GetPrm) (GetRes, error) { if err := b.boltDB.View(func(tx *bbolt.Tx) error { return tx.ForEach(func(bucketName []byte, buck *bbolt.Bucket) error { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + if isNonDataBucket(bucketName) { return nil } diff --git a/pkg/local_object_storage/blobstor/blobovniczatree/get.go b/pkg/local_object_storage/blobstor/blobovniczatree/get.go index 08cacda8a..5f1856673 100644 --- a/pkg/local_object_storage/blobstor/blobovniczatree/get.go +++ b/pkg/local_object_storage/blobstor/blobovniczatree/get.go @@ -94,6 +94,11 @@ func (b *Blobovniczas) Get(ctx context.Context, prm common.GetPrm) (res common.G // // returns error if object could not be read from any blobovnicza of the same level. func (b *Blobovniczas) getObjectFromLevel(ctx context.Context, prm blobovnicza.GetPrm, blzPath string) (common.GetRes, error) { + select { + case <-ctx.Done(): + return common.GetRes{}, ctx.Err() + default: + } // open blobovnicza (cached inside) shBlz := b.getBlobovnicza(blzPath) blz, err := shBlz.Open() diff --git a/pkg/local_object_storage/blobstor/blobovniczatree/iterate.go b/pkg/local_object_storage/blobstor/blobovniczatree/iterate.go index 92014fd55..942d73a12 100644 --- a/pkg/local_object_storage/blobstor/blobovniczatree/iterate.go +++ b/pkg/local_object_storage/blobstor/blobovniczatree/iterate.go @@ -230,6 +230,12 @@ func (b *Blobovniczas) iterateSortedDBPaths(ctx context.Context, addr oid.Addres } func (b *Blobovniczas) iterateSordedDBPathsInternal(ctx context.Context, path string, addr oid.Address, f func(string) (bool, error)) (bool, error) { + select { + case <-ctx.Done(): + return false, ctx.Err() + default: + } + sysPath := filepath.Join(b.rootPath, path) entries, err := os.ReadDir(sysPath) if os.IsNotExist(err) && b.readOnly && path == "" { // non initialized tree in read only mode @@ -252,6 +258,12 @@ func (b *Blobovniczas) iterateSordedDBPathsInternal(ctx context.Context, path st if len(dbIdxs) > 0 { for _, dbIdx := range dbIdxs { + select { + case <-ctx.Done(): + return false, ctx.Err() + default: + } + dbPath := filepath.Join(path, u64ToHexStringExt(dbIdx)) stop, err := f(dbPath) if err != nil { @@ -266,6 +278,12 @@ func (b *Blobovniczas) iterateSordedDBPathsInternal(ctx context.Context, path st if len(dirIdxs) > 0 { hrw.SortSliceByValue(dirIdxs, addressHash(&addr, path)) for _, dirIdx := range dirIdxs { + select { + case <-ctx.Done(): + return false, ctx.Err() + default: + } + dirPath := filepath.Join(path, u64ToHexString(dirIdx)) stop, err := b.iterateSordedDBPathsInternal(ctx, dirPath, addr, f) if err != nil { diff --git a/pkg/local_object_storage/blobstor/exists.go b/pkg/local_object_storage/blobstor/exists.go index 43feec7c9..21b4016d2 100644 --- a/pkg/local_object_storage/blobstor/exists.go +++ b/pkg/local_object_storage/blobstor/exists.go @@ -3,6 +3,7 @@ package blobstor import ( "context" "encoding/hex" + "errors" "time" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" @@ -57,27 +58,30 @@ func (b *BlobStor) Exists(ctx context.Context, prm common.ExistsPrm) (common.Exi // error | found | log the error, return true, nil // error | not found | return the error // error | error | log the first error, return the second - var errors []error + var storageErrors []error for i := range b.storage { res, err := b.storage[i].Storage.Exists(ctx, prm) if err == nil && res.Exists { exists = true return res, nil } else if err != nil { - errors = append(errors, err) + if errors.Is(err, context.Canceled) { + return common.ExistsRes{}, err + } + storageErrors = append(storageErrors, err) } } - if len(errors) == 0 { + if len(storageErrors) == 0 { return common.ExistsRes{}, nil } - for _, err := range errors[:len(errors)-1] { + for _, err := range storageErrors[:len(storageErrors)-1] { b.log.Warn(logs.BlobstorErrorOccurredDuringObjectExistenceChecking, zap.Stringer("address", prm.Address), zap.String("error", err.Error()), zap.String("trace_id", tracingPkg.GetTraceID(ctx))) } - return common.ExistsRes{}, errors[len(errors)-1] + return common.ExistsRes{}, storageErrors[len(storageErrors)-1] } diff --git a/pkg/local_object_storage/blobstor/fstree/fstree.go b/pkg/local_object_storage/blobstor/fstree/fstree.go index 948872fd2..af241becf 100644 --- a/pkg/local_object_storage/blobstor/fstree/fstree.go +++ b/pkg/local_object_storage/blobstor/fstree/fstree.go @@ -302,6 +302,12 @@ func (t *FSTree) Exists(ctx context.Context, prm common.ExistsPrm) (common.Exist )) defer span.End() + select { + case <-ctx.Done(): + return common.ExistsRes{}, ctx.Err() + default: + } + p := t.treePath(prm.Address) _, err := os.Stat(p) diff --git a/pkg/local_object_storage/engine/engine_test.go b/pkg/local_object_storage/engine/engine_test.go index a62734c0a..b20f45be5 100644 --- a/pkg/local_object_storage/engine/engine_test.go +++ b/pkg/local_object_storage/engine/engine_test.go @@ -39,6 +39,12 @@ func BenchmarkExists(b *testing.B) { b.Run("8 shards", func(b *testing.B) { benchmarkExists(b, 8) }) + b.Run("12 shards", func(b *testing.B) { + benchmarkExists(b, 12) + }) + b.Run("16 shards", func(b *testing.B) { + benchmarkExists(b, 12) + }) } func benchmarkExists(b *testing.B, shardNum int) { diff --git a/pkg/local_object_storage/engine/exists.go b/pkg/local_object_storage/engine/exists.go index ef6292768..ee4dad341 100644 --- a/pkg/local_object_storage/engine/exists.go +++ b/pkg/local_object_storage/engine/exists.go @@ -3,54 +3,70 @@ package engine import ( "context" "errors" + "sync/atomic" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client" apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" + "golang.org/x/sync/errgroup" ) func (e *StorageEngine) exists(ctx context.Context, addr oid.Address) (bool, error) { var shPrm shard.ExistsPrm shPrm.SetAddress(addr) - alreadyRemoved := false - exists := false + + var exists atomic.Bool + eg, egCtx := errgroup.WithContext(ctx) + egCtx, cancel := context.WithCancel(egCtx) + defer cancel() e.iterateOverSortedShards(addr, func(_ int, sh hashedShard) (stop bool) { - res, err := sh.Exists(ctx, shPrm) - if err != nil { - if client.IsErrObjectAlreadyRemoved(err) { - alreadyRemoved = true - - return true - } - - var siErr *objectSDK.SplitInfoError - if errors.As(err, &siErr) { - return true - } - - if shard.IsErrObjectExpired(err) { - return true - } - - if !client.IsErrObjectNotFound(err) { - e.reportShardError(sh, "could not check existence of object in shard", err) - } - return false + select { + case <-egCtx.Done(): + return true + default: } - if !exists { - exists = res.Exists() - } + eg.Go(func() error { + res, err := sh.Exists(egCtx, shPrm) + if err != nil { + if errors.Is(err, context.Canceled) { + return err + } + if client.IsErrObjectAlreadyRemoved(err) { + return err + } + + var siErr *objectSDK.SplitInfoError + if errors.As(err, &siErr) { + return err + } + + if shard.IsErrObjectExpired(err) { + return err + } + + if !client.IsErrObjectNotFound(err) { + e.reportShardError(sh, "could not check existence of object in shard", err) + } + return nil + } + if res.Exists() { + exists.Store(true) + cancel() + } + return nil + }) return false }) - if alreadyRemoved { + err := eg.Wait() + if client.IsErrObjectAlreadyRemoved(err) { return false, new(apistatus.ObjectAlreadyRemoved) } - return exists, nil + return exists.Load(), nil } diff --git a/pkg/local_object_storage/engine/put.go b/pkg/local_object_storage/engine/put.go index 79ee3a997..7ce915ad8 100644 --- a/pkg/local_object_storage/engine/put.go +++ b/pkg/local_object_storage/engine/put.go @@ -63,8 +63,6 @@ func (e *StorageEngine) put(ctx context.Context, prm PutPrm) error { addr := object.AddressOf(prm.obj) - // In #1146 this check was parallelized, however, it became - // much slower on fast machines for 4 shards. _, err := e.exists(ctx, addr) if err != nil { return err diff --git a/pkg/local_object_storage/metabase/exists.go b/pkg/local_object_storage/metabase/exists.go index aa9aba106..bca18e21f 100644 --- a/pkg/local_object_storage/metabase/exists.go +++ b/pkg/local_object_storage/metabase/exists.go @@ -67,6 +67,12 @@ func (db *DB) Exists(ctx context.Context, prm ExistsPrm) (res ExistsRes, err err return res, ErrDegradedMode } + select { + case <-ctx.Done(): + return res, ctx.Err() + default: + } + currEpoch := db.epochState.CurrentEpoch() err = db.boltDB.View(func(tx *bbolt.Tx) error { diff --git a/pkg/local_object_storage/shard/exists.go b/pkg/local_object_storage/shard/exists.go index 2cdb8dfa8..7296426a6 100644 --- a/pkg/local_object_storage/shard/exists.go +++ b/pkg/local_object_storage/shard/exists.go @@ -47,6 +47,12 @@ func (s *Shard) Exists(ctx context.Context, prm ExistsPrm) (ExistsRes, error) { )) defer span.End() + select { + case <-ctx.Done(): + return ExistsRes{}, ctx.Err() + default: + } + var exists bool var err error