engine: Check object existence concurrently #912
|
@ -21,10 +21,22 @@ func (b *Blobovnicza) Exists(ctx context.Context, addr oid.Address) (bool, error
|
|||
))
|
||||
defer span.End()
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return false, ctx.Err()
|
||||
default:
|
||||
}
|
||||
|
||||
addrKey := addressKey(addr)
|
||||
|
||||
err := b.boltDB.View(func(tx *bbolt.Tx) error {
|
||||
return tx.ForEach(func(bucketName []byte, buck *bbolt.Bucket) error {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
default:
|
||||
}
|
||||
|
||||
if isNonDataBucket(bucketName) {
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -51,6 +51,12 @@ func (b *Blobovnicza) Get(ctx context.Context, prm GetPrm) (GetRes, error) {
|
|||
))
|
||||
defer span.End()
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return GetRes{}, ctx.Err()
|
||||
default:
|
||||
}
|
||||
|
||||
var (
|
||||
data []byte
|
||||
addrKey = addressKey(prm.addr)
|
||||
|
@ -58,6 +64,12 @@ func (b *Blobovnicza) Get(ctx context.Context, prm GetPrm) (GetRes, error) {
|
|||
|
||||
if err := b.boltDB.View(func(tx *bbolt.Tx) error {
|
||||
return tx.ForEach(func(bucketName []byte, buck *bbolt.Bucket) error {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
default:
|
||||
}
|
||||
|
||||
if isNonDataBucket(bucketName) {
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -94,6 +94,11 @@ func (b *Blobovniczas) Get(ctx context.Context, prm common.GetPrm) (res common.G
|
|||
//
|
||||
// returns error if object could not be read from any blobovnicza of the same level.
|
||||
func (b *Blobovniczas) getObjectFromLevel(ctx context.Context, prm blobovnicza.GetPrm, blzPath string) (common.GetRes, error) {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return common.GetRes{}, ctx.Err()
|
||||
default:
|
||||
}
|
||||
// open blobovnicza (cached inside)
|
||||
shBlz := b.getBlobovnicza(blzPath)
|
||||
blz, err := shBlz.Open()
|
||||
|
|
|
@ -230,6 +230,12 @@ func (b *Blobovniczas) iterateSortedDBPaths(ctx context.Context, addr oid.Addres
|
|||
}
|
||||
|
||||
func (b *Blobovniczas) iterateSordedDBPathsInternal(ctx context.Context, path string, addr oid.Address, f func(string) (bool, error)) (bool, error) {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return false, ctx.Err()
|
||||
default:
|
||||
}
|
||||
|
||||
sysPath := filepath.Join(b.rootPath, path)
|
||||
entries, err := os.ReadDir(sysPath)
|
||||
if os.IsNotExist(err) && b.readOnly && path == "" { // non initialized tree in read only mode
|
||||
|
@ -252,6 +258,12 @@ func (b *Blobovniczas) iterateSordedDBPathsInternal(ctx context.Context, path st
|
|||
|
||||
if len(dbIdxs) > 0 {
|
||||
for _, dbIdx := range dbIdxs {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return false, ctx.Err()
|
||||
default:
|
||||
}
|
||||
|
||||
dbPath := filepath.Join(path, u64ToHexStringExt(dbIdx))
|
||||
stop, err := f(dbPath)
|
||||
if err != nil {
|
||||
|
@ -266,6 +278,12 @@ func (b *Blobovniczas) iterateSordedDBPathsInternal(ctx context.Context, path st
|
|||
if len(dirIdxs) > 0 {
|
||||
hrw.SortSliceByValue(dirIdxs, addressHash(&addr, path))
|
||||
for _, dirIdx := range dirIdxs {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return false, ctx.Err()
|
||||
default:
|
||||
}
|
||||
|
||||
dirPath := filepath.Join(path, u64ToHexString(dirIdx))
|
||||
stop, err := b.iterateSordedDBPathsInternal(ctx, dirPath, addr, f)
|
||||
if err != nil {
|
||||
|
|
|
@ -3,6 +3,7 @@ package blobstor
|
|||
import (
|
||||
"context"
|
||||
"encoding/hex"
|
||||
"errors"
|
||||
"time"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||
|
@ -57,27 +58,30 @@ func (b *BlobStor) Exists(ctx context.Context, prm common.ExistsPrm) (common.Exi
|
|||
// error | found | log the error, return true, nil
|
||||
// error | not found | return the error
|
||||
// error | error | log the first error, return the second
|
||||
var errors []error
|
||||
var storageErrors []error
|
||||
for i := range b.storage {
|
||||
res, err := b.storage[i].Storage.Exists(ctx, prm)
|
||||
if err == nil && res.Exists {
|
||||
exists = true
|
||||
return res, nil
|
||||
} else if err != nil {
|
||||
errors = append(errors, err)
|
||||
if errors.Is(err, context.Canceled) {
|
||||
return common.ExistsRes{}, err
|
||||
}
|
||||
storageErrors = append(storageErrors, err)
|
||||
}
|
||||
}
|
||||
|
||||
if len(errors) == 0 {
|
||||
if len(storageErrors) == 0 {
|
||||
return common.ExistsRes{}, nil
|
||||
}
|
||||
|
||||
for _, err := range errors[:len(errors)-1] {
|
||||
for _, err := range storageErrors[:len(storageErrors)-1] {
|
||||
b.log.Warn(logs.BlobstorErrorOccurredDuringObjectExistenceChecking,
|
||||
zap.Stringer("address", prm.Address),
|
||||
zap.String("error", err.Error()),
|
||||
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
|
||||
}
|
||||
|
||||
return common.ExistsRes{}, errors[len(errors)-1]
|
||||
return common.ExistsRes{}, storageErrors[len(storageErrors)-1]
|
||||
}
|
||||
|
|
|
@ -302,6 +302,12 @@ func (t *FSTree) Exists(ctx context.Context, prm common.ExistsPrm) (common.Exist
|
|||
))
|
||||
defer span.End()
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return common.ExistsRes{}, ctx.Err()
|
||||
default:
|
||||
}
|
||||
|
||||
p := t.treePath(prm.Address)
|
||||
|
||||
_, err := os.Stat(p)
|
||||
|
|
|
@ -39,6 +39,12 @@ func BenchmarkExists(b *testing.B) {
|
|||
b.Run("8 shards", func(b *testing.B) {
|
||||
benchmarkExists(b, 8)
|
||||
})
|
||||
b.Run("12 shards", func(b *testing.B) {
|
||||
benchmarkExists(b, 12)
|
||||
})
|
||||
b.Run("16 shards", func(b *testing.B) {
|
||||
benchmarkExists(b, 12)
|
||||
})
|
||||
}
|
||||
|
||||
func benchmarkExists(b *testing.B, shardNum int) {
|
||||
|
|
|
@ -3,54 +3,70 @@ package engine
|
|||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"sync/atomic"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
|
||||
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
"golang.org/x/sync/errgroup"
|
||||
)
|
||||
|
||||
func (e *StorageEngine) exists(ctx context.Context, addr oid.Address) (bool, error) {
|
||||
var shPrm shard.ExistsPrm
|
||||
shPrm.SetAddress(addr)
|
||||
alreadyRemoved := false
|
||||
exists := false
|
||||
|
||||
var exists atomic.Bool
|
||||
eg, egCtx := errgroup.WithContext(ctx)
|
||||
egCtx, cancel := context.WithCancel(egCtx)
|
||||
defer cancel()
|
||||
|
||||
e.iterateOverSortedShards(addr, func(_ int, sh hashedShard) (stop bool) {
|
||||
res, err := sh.Exists(ctx, shPrm)
|
||||
if err != nil {
|
||||
if client.IsErrObjectAlreadyRemoved(err) {
|
||||
alreadyRemoved = true
|
||||
|
||||
select {
|
||||
case <-egCtx.Done():
|
||||
return true
|
||||
default:
|
||||
}
|
||||
|
||||
eg.Go(func() error {
|
||||
res, err := sh.Exists(egCtx, shPrm)
|
||||
if err != nil {
|
||||
if errors.Is(err, context.Canceled) {
|
||||
return err
|
||||
}
|
||||
|
||||
if client.IsErrObjectAlreadyRemoved(err) {
|
||||
return err
|
||||
|
||||
}
|
||||
|
||||
var siErr *objectSDK.SplitInfoError
|
||||
if errors.As(err, &siErr) {
|
||||
return true
|
||||
return err
|
||||
}
|
||||
|
||||
if shard.IsErrObjectExpired(err) {
|
||||
return true
|
||||
return err
|
||||
}
|
||||
|
||||
if !client.IsErrObjectNotFound(err) {
|
||||
e.reportShardError(sh, "could not check existence of object in shard", err)
|
||||
}
|
||||
return false
|
||||
return nil
|
||||
}
|
||||
|
||||
if !exists {
|
||||
exists = res.Exists()
|
||||
if res.Exists() {
|
||||
fyrchik
commented
How is it different from How is it different from `Store(true)`? I would expect `Store` to have less overhead and more obvious, calling `cancel()` twice is allowed too. Don't mind using your approach too.
dstepanov-yadro
commented
`cancel` should be called only if object found, so we have to check `res.Exists()`. Or I don't understand your point.
fyrchik
commented
I mean I mean `if res.Exists() { exists.Store(true); cancel() }` instead of `CompareAndSwap`
In my approach we have estimated number of atomic operations equal to 1.
dstepanov-yadro
commented
fixed. fixed.
Also dropped comment about concurrent exists.
|
||||
exists.Store(true)
|
||||
cancel()
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
return false
|
||||
})
|
||||
|
||||
if alreadyRemoved {
|
||||
err := eg.Wait()
|
||||
if client.IsErrObjectAlreadyRemoved(err) {
|
||||
return false, new(apistatus.ObjectAlreadyRemoved)
|
||||
}
|
||||
|
||||
return exists, nil
|
||||
return exists.Load(), nil
|
||||
}
|
||||
|
|
|
@ -63,8 +63,6 @@ func (e *StorageEngine) put(ctx context.Context, prm PutPrm) error {
|
|||
|
||||
addr := object.AddressOf(prm.obj)
|
||||
|
||||
// In #1146 this check was parallelized, however, it became
|
||||
// much slower on fast machines for 4 shards.
|
||||
_, err := e.exists(ctx, addr)
|
||||
if err != nil {
|
||||
return err
|
||||
|
|
|
@ -67,6 +67,12 @@ func (db *DB) Exists(ctx context.Context, prm ExistsPrm) (res ExistsRes, err err
|
|||
return res, ErrDegradedMode
|
||||
}
|
||||
|
||||
select {
|
||||
fyrchik marked this conversation as resolved
Outdated
fyrchik
commented
Do all these Do all these `select` optimizations affect running speed? Asking because context expiring in `Exists()` is not likely, it is the first step in put, much faster than the expected operation time.
dstepanov-yadro
commented
These These `select` are most required for blobovnicza tree.
I think it is good practice to check `ctx.Done()` before any IO operation.
|
||||
case <-ctx.Done():
|
||||
return res, ctx.Err()
|
||||
default:
|
||||
}
|
||||
|
||||
currEpoch := db.epochState.CurrentEpoch()
|
||||
|
||||
err = db.boltDB.View(func(tx *bbolt.Tx) error {
|
||||
|
|
|
@ -47,6 +47,12 @@ func (s *Shard) Exists(ctx context.Context, prm ExistsPrm) (ExistsRes, error) {
|
|||
))
|
||||
defer span.End()
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ExistsRes{}, ctx.Err()
|
||||
default:
|
||||
}
|
||||
|
||||
var exists bool
|
||||
var err error
|
||||
|
||||
|
|
Why not to perform
cancel
here too? Looks like the result will be obtained faster in this case.Fixed, thanks
Oh, no. It's ok. If
eg.Go
returns error, than egCtx will be cancelled by errgroup.