engine: Revert Check object existance concurrently #1058

Merged
fyrchik merged 1 commit from dstepanov-yadro/frostfs-node:fix/revert_concurrent_exist into master 2024-04-01 08:42:36 +00:00
11 changed files with 35 additions and 124 deletions

View file

@ -21,22 +21,10 @@ func (b *Blobovnicza) Exists(ctx context.Context, addr oid.Address) (bool, error
)) ))
defer span.End() defer span.End()
select {
case <-ctx.Done():
return false, ctx.Err()
default:
}
addrKey := addressKey(addr) addrKey := addressKey(addr)
err := b.boltDB.View(func(tx *bbolt.Tx) error { err := b.boltDB.View(func(tx *bbolt.Tx) error {
return tx.ForEach(func(bucketName []byte, buck *bbolt.Bucket) error { return tx.ForEach(func(bucketName []byte, buck *bbolt.Bucket) error {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
if isNonDataBucket(bucketName) { if isNonDataBucket(bucketName) {
return nil return nil
} }

View file

@ -51,12 +51,6 @@ func (b *Blobovnicza) Get(ctx context.Context, prm GetPrm) (GetRes, error) {
)) ))
defer span.End() defer span.End()
select {
case <-ctx.Done():
return GetRes{}, ctx.Err()
default:
}
var ( var (
data []byte data []byte
addrKey = addressKey(prm.addr) addrKey = addressKey(prm.addr)
@ -64,12 +58,6 @@ func (b *Blobovnicza) Get(ctx context.Context, prm GetPrm) (GetRes, error) {
if err := b.boltDB.View(func(tx *bbolt.Tx) error { if err := b.boltDB.View(func(tx *bbolt.Tx) error {
return tx.ForEach(func(bucketName []byte, buck *bbolt.Bucket) error { return tx.ForEach(func(bucketName []byte, buck *bbolt.Bucket) error {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
if isNonDataBucket(bucketName) { if isNonDataBucket(bucketName) {
return nil return nil
} }

View file

@ -94,11 +94,6 @@ func (b *Blobovniczas) Get(ctx context.Context, prm common.GetPrm) (res common.G
// //
// returns error if object could not be read from any blobovnicza of the same level. // returns error if object could not be read from any blobovnicza of the same level.
func (b *Blobovniczas) getObjectFromLevel(ctx context.Context, prm blobovnicza.GetPrm, blzPath string) (common.GetRes, error) { func (b *Blobovniczas) getObjectFromLevel(ctx context.Context, prm blobovnicza.GetPrm, blzPath string) (common.GetRes, error) {
select {
case <-ctx.Done():
return common.GetRes{}, ctx.Err()
default:
}
// open blobovnicza (cached inside) // open blobovnicza (cached inside)
shBlz := b.getBlobovnicza(blzPath) shBlz := b.getBlobovnicza(blzPath)
blz, err := shBlz.Open() blz, err := shBlz.Open()

View file

@ -230,12 +230,6 @@ func (b *Blobovniczas) iterateSortedDBPaths(ctx context.Context, addr oid.Addres
} }
func (b *Blobovniczas) iterateSordedDBPathsInternal(ctx context.Context, path string, addr oid.Address, f func(string) (bool, error)) (bool, error) { func (b *Blobovniczas) iterateSordedDBPathsInternal(ctx context.Context, path string, addr oid.Address, f func(string) (bool, error)) (bool, error) {
select {
case <-ctx.Done():
return false, ctx.Err()
default:
}
sysPath := filepath.Join(b.rootPath, path) sysPath := filepath.Join(b.rootPath, path)
entries, err := os.ReadDir(sysPath) entries, err := os.ReadDir(sysPath)
if os.IsNotExist(err) && b.readOnly && path == "" { // non initialized tree in read only mode if os.IsNotExist(err) && b.readOnly && path == "" { // non initialized tree in read only mode
@ -258,12 +252,6 @@ func (b *Blobovniczas) iterateSordedDBPathsInternal(ctx context.Context, path st
if len(dbIdxs) > 0 { if len(dbIdxs) > 0 {
for _, dbIdx := range dbIdxs { for _, dbIdx := range dbIdxs {
select {
case <-ctx.Done():
return false, ctx.Err()
default:
}
dbPath := filepath.Join(path, u64ToHexStringExt(dbIdx)) dbPath := filepath.Join(path, u64ToHexStringExt(dbIdx))
stop, err := f(dbPath) stop, err := f(dbPath)
if err != nil { if err != nil {
@ -278,12 +266,6 @@ func (b *Blobovniczas) iterateSordedDBPathsInternal(ctx context.Context, path st
if len(dirIdxs) > 0 { if len(dirIdxs) > 0 {
hrw.SortSliceByValue(dirIdxs, addressHash(&addr, path)) hrw.SortSliceByValue(dirIdxs, addressHash(&addr, path))
for _, dirIdx := range dirIdxs { for _, dirIdx := range dirIdxs {
select {
case <-ctx.Done():
return false, ctx.Err()
default:
}
dirPath := filepath.Join(path, u64ToHexString(dirIdx)) dirPath := filepath.Join(path, u64ToHexString(dirIdx))
stop, err := b.iterateSordedDBPathsInternal(ctx, dirPath, addr, f) stop, err := b.iterateSordedDBPathsInternal(ctx, dirPath, addr, f)
if err != nil { if err != nil {

View file

@ -3,7 +3,6 @@ package blobstor
import ( import (
"context" "context"
"encoding/hex" "encoding/hex"
"errors"
"time" "time"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
@ -58,30 +57,27 @@ func (b *BlobStor) Exists(ctx context.Context, prm common.ExistsPrm) (common.Exi
// error | found | log the error, return true, nil // error | found | log the error, return true, nil
// error | not found | return the error // error | not found | return the error
// error | error | log the first error, return the second // error | error | log the first error, return the second
var storageErrors []error var errors []error
for i := range b.storage { for i := range b.storage {
res, err := b.storage[i].Storage.Exists(ctx, prm) res, err := b.storage[i].Storage.Exists(ctx, prm)
if err == nil && res.Exists { if err == nil && res.Exists {
exists = true exists = true
return res, nil return res, nil
} else if err != nil { } else if err != nil {
if errors.Is(err, context.Canceled) { errors = append(errors, err)
return common.ExistsRes{}, err
}
storageErrors = append(storageErrors, err)
} }
} }
if len(storageErrors) == 0 { if len(errors) == 0 {
return common.ExistsRes{}, nil return common.ExistsRes{}, nil
} }
for _, err := range storageErrors[:len(storageErrors)-1] { for _, err := range errors[:len(errors)-1] {
b.log.Warn(logs.BlobstorErrorOccurredDuringObjectExistenceChecking, b.log.Warn(logs.BlobstorErrorOccurredDuringObjectExistenceChecking,
zap.Stringer("address", prm.Address), zap.Stringer("address", prm.Address),
zap.String("error", err.Error()), zap.String("error", err.Error()),
zap.String("trace_id", tracingPkg.GetTraceID(ctx))) zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
} }
return common.ExistsRes{}, storageErrors[len(storageErrors)-1] return common.ExistsRes{}, errors[len(errors)-1]
} }

View file

@ -285,12 +285,6 @@ func (t *FSTree) Exists(ctx context.Context, prm common.ExistsPrm) (common.Exist
)) ))
defer span.End() defer span.End()
select {
case <-ctx.Done():
return common.ExistsRes{}, ctx.Err()
default:
}
p := t.treePath(prm.Address) p := t.treePath(prm.Address)
_, err := os.Stat(p) _, err := os.Stat(p)

View file

@ -39,12 +39,6 @@ func BenchmarkExists(b *testing.B) {
b.Run("8 shards", func(b *testing.B) { b.Run("8 shards", func(b *testing.B) {
benchmarkExists(b, 8) benchmarkExists(b, 8)
}) })
b.Run("12 shards", func(b *testing.B) {
benchmarkExists(b, 12)
})
b.Run("16 shards", func(b *testing.B) {
benchmarkExists(b, 12)
})
} }
func benchmarkExists(b *testing.B, shardNum int) { func benchmarkExists(b *testing.B, shardNum int) {

View file

@ -3,70 +3,54 @@ package engine
import ( import (
"context" "context"
"errors" "errors"
"sync/atomic"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status" apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"golang.org/x/sync/errgroup"
) )
func (e *StorageEngine) exists(ctx context.Context, addr oid.Address) (bool, error) { func (e *StorageEngine) exists(ctx context.Context, addr oid.Address) (bool, error) {
var shPrm shard.ExistsPrm var shPrm shard.ExistsPrm
shPrm.SetAddress(addr) shPrm.SetAddress(addr)
alreadyRemoved := false
var exists atomic.Bool exists := false
eg, egCtx := errgroup.WithContext(ctx)
egCtx, cancel := context.WithCancel(egCtx)
defer cancel()
e.iterateOverSortedShards(addr, func(_ int, sh hashedShard) (stop bool) { e.iterateOverSortedShards(addr, func(_ int, sh hashedShard) (stop bool) {
select { res, err := sh.Exists(ctx, shPrm)
case <-egCtx.Done(): if err != nil {
return true if client.IsErrObjectAlreadyRemoved(err) {
default: alreadyRemoved = true
return true
}
var siErr *objectSDK.SplitInfoError
if errors.As(err, &siErr) {
return true
}
if shard.IsErrObjectExpired(err) {
return true
}
if !client.IsErrObjectNotFound(err) {
e.reportShardError(sh, "could not check existence of object in shard", err)
}
return false
} }
eg.Go(func() error { if !exists {
res, err := sh.Exists(egCtx, shPrm) exists = res.Exists()
if err != nil { }
if errors.Is(err, context.Canceled) {
return err
}
if client.IsErrObjectAlreadyRemoved(err) {
return err
}
var siErr *objectSDK.SplitInfoError
if errors.As(err, &siErr) {
return err
}
if shard.IsErrObjectExpired(err) {
return err
}
if !client.IsErrObjectNotFound(err) {
e.reportShardError(sh, "could not check existence of object in shard", err)
}
return nil
}
if res.Exists() {
exists.Store(true)
cancel()
}
return nil
})
return false return false
}) })
err := eg.Wait() if alreadyRemoved {
if client.IsErrObjectAlreadyRemoved(err) {
return false, new(apistatus.ObjectAlreadyRemoved) return false, new(apistatus.ObjectAlreadyRemoved)
} }
return exists.Load(), nil return exists, nil
} }

View file

@ -78,6 +78,8 @@ func (e *StorageEngine) put(ctx context.Context, prm PutPrm) error {
addr := object.AddressOf(prm.obj) addr := object.AddressOf(prm.obj)
// In #1146 this check was parallelized, however, it became
// much slower on fast machines for 4 shards.
_, err := e.exists(ctx, addr) _, err := e.exists(ctx, addr)
if err != nil { if err != nil {
return err return err

View file

@ -67,12 +67,6 @@ func (db *DB) Exists(ctx context.Context, prm ExistsPrm) (res ExistsRes, err err
return res, ErrDegradedMode return res, ErrDegradedMode
} }
select {
case <-ctx.Done():
return res, ctx.Err()
default:
}
currEpoch := db.epochState.CurrentEpoch() currEpoch := db.epochState.CurrentEpoch()
err = db.boltDB.View(func(tx *bbolt.Tx) error { err = db.boltDB.View(func(tx *bbolt.Tx) error {

View file

@ -47,12 +47,6 @@ func (s *Shard) Exists(ctx context.Context, prm ExistsPrm) (ExistsRes, error) {
)) ))
defer span.End() defer span.End()
select {
case <-ctx.Done():
return ExistsRes{}, ctx.Err()
default:
}
var exists bool var exists bool
var err error var err error