engine: Revert Check object existance concurrently #1058
11 changed files with 35 additions and 124 deletions
|
@ -21,22 +21,10 @@ func (b *Blobovnicza) Exists(ctx context.Context, addr oid.Address) (bool, error
|
||||||
))
|
))
|
||||||
defer span.End()
|
defer span.End()
|
||||||
|
|
||||||
select {
|
|
||||||
case <-ctx.Done():
|
|
||||||
return false, ctx.Err()
|
|
||||||
default:
|
|
||||||
}
|
|
||||||
|
|
||||||
addrKey := addressKey(addr)
|
addrKey := addressKey(addr)
|
||||||
|
|
||||||
err := b.boltDB.View(func(tx *bbolt.Tx) error {
|
err := b.boltDB.View(func(tx *bbolt.Tx) error {
|
||||||
return tx.ForEach(func(bucketName []byte, buck *bbolt.Bucket) error {
|
return tx.ForEach(func(bucketName []byte, buck *bbolt.Bucket) error {
|
||||||
select {
|
|
||||||
case <-ctx.Done():
|
|
||||||
return ctx.Err()
|
|
||||||
default:
|
|
||||||
}
|
|
||||||
|
|
||||||
if isNonDataBucket(bucketName) {
|
if isNonDataBucket(bucketName) {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -51,12 +51,6 @@ func (b *Blobovnicza) Get(ctx context.Context, prm GetPrm) (GetRes, error) {
|
||||||
))
|
))
|
||||||
defer span.End()
|
defer span.End()
|
||||||
|
|
||||||
select {
|
|
||||||
case <-ctx.Done():
|
|
||||||
return GetRes{}, ctx.Err()
|
|
||||||
default:
|
|
||||||
}
|
|
||||||
|
|
||||||
var (
|
var (
|
||||||
data []byte
|
data []byte
|
||||||
addrKey = addressKey(prm.addr)
|
addrKey = addressKey(prm.addr)
|
||||||
|
@ -64,12 +58,6 @@ func (b *Blobovnicza) Get(ctx context.Context, prm GetPrm) (GetRes, error) {
|
||||||
|
|
||||||
if err := b.boltDB.View(func(tx *bbolt.Tx) error {
|
if err := b.boltDB.View(func(tx *bbolt.Tx) error {
|
||||||
return tx.ForEach(func(bucketName []byte, buck *bbolt.Bucket) error {
|
return tx.ForEach(func(bucketName []byte, buck *bbolt.Bucket) error {
|
||||||
select {
|
|
||||||
case <-ctx.Done():
|
|
||||||
return ctx.Err()
|
|
||||||
default:
|
|
||||||
}
|
|
||||||
|
|
||||||
if isNonDataBucket(bucketName) {
|
if isNonDataBucket(bucketName) {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -94,11 +94,6 @@ func (b *Blobovniczas) Get(ctx context.Context, prm common.GetPrm) (res common.G
|
||||||
//
|
//
|
||||||
// returns error if object could not be read from any blobovnicza of the same level.
|
// returns error if object could not be read from any blobovnicza of the same level.
|
||||||
func (b *Blobovniczas) getObjectFromLevel(ctx context.Context, prm blobovnicza.GetPrm, blzPath string) (common.GetRes, error) {
|
func (b *Blobovniczas) getObjectFromLevel(ctx context.Context, prm blobovnicza.GetPrm, blzPath string) (common.GetRes, error) {
|
||||||
select {
|
|
||||||
case <-ctx.Done():
|
|
||||||
return common.GetRes{}, ctx.Err()
|
|
||||||
default:
|
|
||||||
}
|
|
||||||
// open blobovnicza (cached inside)
|
// open blobovnicza (cached inside)
|
||||||
shBlz := b.getBlobovnicza(blzPath)
|
shBlz := b.getBlobovnicza(blzPath)
|
||||||
blz, err := shBlz.Open()
|
blz, err := shBlz.Open()
|
||||||
|
|
|
@ -230,12 +230,6 @@ func (b *Blobovniczas) iterateSortedDBPaths(ctx context.Context, addr oid.Addres
|
||||||
}
|
}
|
||||||
|
|
||||||
func (b *Blobovniczas) iterateSordedDBPathsInternal(ctx context.Context, path string, addr oid.Address, f func(string) (bool, error)) (bool, error) {
|
func (b *Blobovniczas) iterateSordedDBPathsInternal(ctx context.Context, path string, addr oid.Address, f func(string) (bool, error)) (bool, error) {
|
||||||
select {
|
|
||||||
case <-ctx.Done():
|
|
||||||
return false, ctx.Err()
|
|
||||||
default:
|
|
||||||
}
|
|
||||||
|
|
||||||
sysPath := filepath.Join(b.rootPath, path)
|
sysPath := filepath.Join(b.rootPath, path)
|
||||||
entries, err := os.ReadDir(sysPath)
|
entries, err := os.ReadDir(sysPath)
|
||||||
if os.IsNotExist(err) && b.readOnly && path == "" { // non initialized tree in read only mode
|
if os.IsNotExist(err) && b.readOnly && path == "" { // non initialized tree in read only mode
|
||||||
|
@ -258,12 +252,6 @@ func (b *Blobovniczas) iterateSordedDBPathsInternal(ctx context.Context, path st
|
||||||
|
|
||||||
if len(dbIdxs) > 0 {
|
if len(dbIdxs) > 0 {
|
||||||
for _, dbIdx := range dbIdxs {
|
for _, dbIdx := range dbIdxs {
|
||||||
select {
|
|
||||||
case <-ctx.Done():
|
|
||||||
return false, ctx.Err()
|
|
||||||
default:
|
|
||||||
}
|
|
||||||
|
|
||||||
dbPath := filepath.Join(path, u64ToHexStringExt(dbIdx))
|
dbPath := filepath.Join(path, u64ToHexStringExt(dbIdx))
|
||||||
stop, err := f(dbPath)
|
stop, err := f(dbPath)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -278,12 +266,6 @@ func (b *Blobovniczas) iterateSordedDBPathsInternal(ctx context.Context, path st
|
||||||
if len(dirIdxs) > 0 {
|
if len(dirIdxs) > 0 {
|
||||||
hrw.SortSliceByValue(dirIdxs, addressHash(&addr, path))
|
hrw.SortSliceByValue(dirIdxs, addressHash(&addr, path))
|
||||||
for _, dirIdx := range dirIdxs {
|
for _, dirIdx := range dirIdxs {
|
||||||
select {
|
|
||||||
case <-ctx.Done():
|
|
||||||
return false, ctx.Err()
|
|
||||||
default:
|
|
||||||
}
|
|
||||||
|
|
||||||
dirPath := filepath.Join(path, u64ToHexString(dirIdx))
|
dirPath := filepath.Join(path, u64ToHexString(dirIdx))
|
||||||
stop, err := b.iterateSordedDBPathsInternal(ctx, dirPath, addr, f)
|
stop, err := b.iterateSordedDBPathsInternal(ctx, dirPath, addr, f)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
@ -3,7 +3,6 @@ package blobstor
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"encoding/hex"
|
"encoding/hex"
|
||||||
"errors"
|
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||||
|
@ -58,30 +57,27 @@ func (b *BlobStor) Exists(ctx context.Context, prm common.ExistsPrm) (common.Exi
|
||||||
// error | found | log the error, return true, nil
|
// error | found | log the error, return true, nil
|
||||||
// error | not found | return the error
|
// error | not found | return the error
|
||||||
// error | error | log the first error, return the second
|
// error | error | log the first error, return the second
|
||||||
var storageErrors []error
|
var errors []error
|
||||||
for i := range b.storage {
|
for i := range b.storage {
|
||||||
res, err := b.storage[i].Storage.Exists(ctx, prm)
|
res, err := b.storage[i].Storage.Exists(ctx, prm)
|
||||||
if err == nil && res.Exists {
|
if err == nil && res.Exists {
|
||||||
exists = true
|
exists = true
|
||||||
return res, nil
|
return res, nil
|
||||||
} else if err != nil {
|
} else if err != nil {
|
||||||
if errors.Is(err, context.Canceled) {
|
errors = append(errors, err)
|
||||||
return common.ExistsRes{}, err
|
|
||||||
}
|
|
||||||
storageErrors = append(storageErrors, err)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(storageErrors) == 0 {
|
if len(errors) == 0 {
|
||||||
return common.ExistsRes{}, nil
|
return common.ExistsRes{}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, err := range storageErrors[:len(storageErrors)-1] {
|
for _, err := range errors[:len(errors)-1] {
|
||||||
b.log.Warn(logs.BlobstorErrorOccurredDuringObjectExistenceChecking,
|
b.log.Warn(logs.BlobstorErrorOccurredDuringObjectExistenceChecking,
|
||||||
zap.Stringer("address", prm.Address),
|
zap.Stringer("address", prm.Address),
|
||||||
zap.String("error", err.Error()),
|
zap.String("error", err.Error()),
|
||||||
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
|
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
|
||||||
}
|
}
|
||||||
|
|
||||||
return common.ExistsRes{}, storageErrors[len(storageErrors)-1]
|
return common.ExistsRes{}, errors[len(errors)-1]
|
||||||
}
|
}
|
||||||
|
|
|
@ -285,12 +285,6 @@ func (t *FSTree) Exists(ctx context.Context, prm common.ExistsPrm) (common.Exist
|
||||||
))
|
))
|
||||||
defer span.End()
|
defer span.End()
|
||||||
|
|
||||||
select {
|
|
||||||
case <-ctx.Done():
|
|
||||||
return common.ExistsRes{}, ctx.Err()
|
|
||||||
default:
|
|
||||||
}
|
|
||||||
|
|
||||||
p := t.treePath(prm.Address)
|
p := t.treePath(prm.Address)
|
||||||
|
|
||||||
_, err := os.Stat(p)
|
_, err := os.Stat(p)
|
||||||
|
|
|
@ -39,12 +39,6 @@ func BenchmarkExists(b *testing.B) {
|
||||||
b.Run("8 shards", func(b *testing.B) {
|
b.Run("8 shards", func(b *testing.B) {
|
||||||
benchmarkExists(b, 8)
|
benchmarkExists(b, 8)
|
||||||
})
|
})
|
||||||
b.Run("12 shards", func(b *testing.B) {
|
|
||||||
benchmarkExists(b, 12)
|
|
||||||
})
|
|
||||||
b.Run("16 shards", func(b *testing.B) {
|
|
||||||
benchmarkExists(b, 12)
|
|
||||||
})
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func benchmarkExists(b *testing.B, shardNum int) {
|
func benchmarkExists(b *testing.B, shardNum int) {
|
||||||
|
|
|
@ -3,70 +3,54 @@ package engine
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"errors"
|
"errors"
|
||||||
"sync/atomic"
|
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
|
||||||
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
||||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||||
"golang.org/x/sync/errgroup"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
func (e *StorageEngine) exists(ctx context.Context, addr oid.Address) (bool, error) {
|
func (e *StorageEngine) exists(ctx context.Context, addr oid.Address) (bool, error) {
|
||||||
var shPrm shard.ExistsPrm
|
var shPrm shard.ExistsPrm
|
||||||
shPrm.SetAddress(addr)
|
shPrm.SetAddress(addr)
|
||||||
|
alreadyRemoved := false
|
||||||
var exists atomic.Bool
|
exists := false
|
||||||
eg, egCtx := errgroup.WithContext(ctx)
|
|
||||||
egCtx, cancel := context.WithCancel(egCtx)
|
|
||||||
defer cancel()
|
|
||||||
|
|
||||||
e.iterateOverSortedShards(addr, func(_ int, sh hashedShard) (stop bool) {
|
e.iterateOverSortedShards(addr, func(_ int, sh hashedShard) (stop bool) {
|
||||||
select {
|
res, err := sh.Exists(ctx, shPrm)
|
||||||
case <-egCtx.Done():
|
if err != nil {
|
||||||
return true
|
if client.IsErrObjectAlreadyRemoved(err) {
|
||||||
default:
|
alreadyRemoved = true
|
||||||
|
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
var siErr *objectSDK.SplitInfoError
|
||||||
|
if errors.As(err, &siErr) {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
if shard.IsErrObjectExpired(err) {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
if !client.IsErrObjectNotFound(err) {
|
||||||
|
e.reportShardError(sh, "could not check existence of object in shard", err)
|
||||||
|
}
|
||||||
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
eg.Go(func() error {
|
if !exists {
|
||||||
res, err := sh.Exists(egCtx, shPrm)
|
exists = res.Exists()
|
||||||
if err != nil {
|
}
|
||||||
if errors.Is(err, context.Canceled) {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
if client.IsErrObjectAlreadyRemoved(err) {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
var siErr *objectSDK.SplitInfoError
|
|
||||||
if errors.As(err, &siErr) {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
if shard.IsErrObjectExpired(err) {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
if !client.IsErrObjectNotFound(err) {
|
|
||||||
e.reportShardError(sh, "could not check existence of object in shard", err)
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
if res.Exists() {
|
|
||||||
exists.Store(true)
|
|
||||||
cancel()
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
})
|
|
||||||
return false
|
return false
|
||||||
})
|
})
|
||||||
|
|
||||||
err := eg.Wait()
|
if alreadyRemoved {
|
||||||
if client.IsErrObjectAlreadyRemoved(err) {
|
|
||||||
return false, new(apistatus.ObjectAlreadyRemoved)
|
return false, new(apistatus.ObjectAlreadyRemoved)
|
||||||
}
|
}
|
||||||
|
|
||||||
return exists.Load(), nil
|
return exists, nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -78,6 +78,8 @@ func (e *StorageEngine) put(ctx context.Context, prm PutPrm) error {
|
||||||
|
|
||||||
addr := object.AddressOf(prm.obj)
|
addr := object.AddressOf(prm.obj)
|
||||||
|
|
||||||
|
// In #1146 this check was parallelized, however, it became
|
||||||
|
// much slower on fast machines for 4 shards.
|
||||||
_, err := e.exists(ctx, addr)
|
_, err := e.exists(ctx, addr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
|
|
@ -67,12 +67,6 @@ func (db *DB) Exists(ctx context.Context, prm ExistsPrm) (res ExistsRes, err err
|
||||||
return res, ErrDegradedMode
|
return res, ErrDegradedMode
|
||||||
}
|
}
|
||||||
|
|
||||||
select {
|
|
||||||
case <-ctx.Done():
|
|
||||||
return res, ctx.Err()
|
|
||||||
default:
|
|
||||||
}
|
|
||||||
|
|
||||||
currEpoch := db.epochState.CurrentEpoch()
|
currEpoch := db.epochState.CurrentEpoch()
|
||||||
|
|
||||||
err = db.boltDB.View(func(tx *bbolt.Tx) error {
|
err = db.boltDB.View(func(tx *bbolt.Tx) error {
|
||||||
|
|
|
@ -47,12 +47,6 @@ func (s *Shard) Exists(ctx context.Context, prm ExistsPrm) (ExistsRes, error) {
|
||||||
))
|
))
|
||||||
defer span.End()
|
defer span.End()
|
||||||
|
|
||||||
select {
|
|
||||||
case <-ctx.Done():
|
|
||||||
return ExistsRes{}, ctx.Err()
|
|
||||||
default:
|
|
||||||
}
|
|
||||||
|
|
||||||
var exists bool
|
var exists bool
|
||||||
var err error
|
var err error
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue