engine: Check object existence concurrently #912

Merged
fyrchik merged 1 commit from dstepanov-yadro/frostfs-node:feat/engine_existance_concurrently into master 2024-01-23 07:16:33 +00:00
11 changed files with 124 additions and 35 deletions

View file

@ -21,10 +21,22 @@ func (b *Blobovnicza) Exists(ctx context.Context, addr oid.Address) (bool, error
))
defer span.End()
select {
case <-ctx.Done():
return false, ctx.Err()
default:
}
addrKey := addressKey(addr)
err := b.boltDB.View(func(tx *bbolt.Tx) error {
return tx.ForEach(func(bucketName []byte, buck *bbolt.Bucket) error {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
if isNonDataBucket(bucketName) {
return nil
}

View file

@ -51,6 +51,12 @@ func (b *Blobovnicza) Get(ctx context.Context, prm GetPrm) (GetRes, error) {
))
defer span.End()
select {
case <-ctx.Done():
return GetRes{}, ctx.Err()
default:
}
var (
data []byte
addrKey = addressKey(prm.addr)
@ -58,6 +64,12 @@ func (b *Blobovnicza) Get(ctx context.Context, prm GetPrm) (GetRes, error) {
if err := b.boltDB.View(func(tx *bbolt.Tx) error {
return tx.ForEach(func(bucketName []byte, buck *bbolt.Bucket) error {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
if isNonDataBucket(bucketName) {
return nil
}

View file

@ -94,6 +94,11 @@ func (b *Blobovniczas) Get(ctx context.Context, prm common.GetPrm) (res common.G
//
// returns error if object could not be read from any blobovnicza of the same level.
func (b *Blobovniczas) getObjectFromLevel(ctx context.Context, prm blobovnicza.GetPrm, blzPath string) (common.GetRes, error) {
select {
case <-ctx.Done():
return common.GetRes{}, ctx.Err()
default:
}
// open blobovnicza (cached inside)
shBlz := b.getBlobovnicza(blzPath)
blz, err := shBlz.Open()

View file

@ -230,6 +230,12 @@ func (b *Blobovniczas) iterateSortedDBPaths(ctx context.Context, addr oid.Addres
}
func (b *Blobovniczas) iterateSordedDBPathsInternal(ctx context.Context, path string, addr oid.Address, f func(string) (bool, error)) (bool, error) {
select {
case <-ctx.Done():
return false, ctx.Err()
default:
}
sysPath := filepath.Join(b.rootPath, path)
entries, err := os.ReadDir(sysPath)
if os.IsNotExist(err) && b.readOnly && path == "" { // non initialized tree in read only mode
@ -252,6 +258,12 @@ func (b *Blobovniczas) iterateSordedDBPathsInternal(ctx context.Context, path st
if len(dbIdxs) > 0 {
for _, dbIdx := range dbIdxs {
select {
case <-ctx.Done():
return false, ctx.Err()
default:
}
dbPath := filepath.Join(path, u64ToHexStringExt(dbIdx))
stop, err := f(dbPath)
if err != nil {
@ -266,6 +278,12 @@ func (b *Blobovniczas) iterateSordedDBPathsInternal(ctx context.Context, path st
if len(dirIdxs) > 0 {
hrw.SortSliceByValue(dirIdxs, addressHash(&addr, path))
for _, dirIdx := range dirIdxs {
select {
case <-ctx.Done():
return false, ctx.Err()
default:
}
dirPath := filepath.Join(path, u64ToHexString(dirIdx))
stop, err := b.iterateSordedDBPathsInternal(ctx, dirPath, addr, f)
if err != nil {

View file

@ -3,6 +3,7 @@ package blobstor
import (
"context"
"encoding/hex"
"errors"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
@ -57,27 +58,30 @@ func (b *BlobStor) Exists(ctx context.Context, prm common.ExistsPrm) (common.Exi
// error | found | log the error, return true, nil
// error | not found | return the error
// error | error | log the first error, return the second
var errors []error
var storageErrors []error
for i := range b.storage {
res, err := b.storage[i].Storage.Exists(ctx, prm)
if err == nil && res.Exists {
exists = true
return res, nil
} else if err != nil {
errors = append(errors, err)
if errors.Is(err, context.Canceled) {
return common.ExistsRes{}, err
}
storageErrors = append(storageErrors, err)
}
}
if len(errors) == 0 {
if len(storageErrors) == 0 {
return common.ExistsRes{}, nil
}
for _, err := range errors[:len(errors)-1] {
for _, err := range storageErrors[:len(storageErrors)-1] {
b.log.Warn(logs.BlobstorErrorOccurredDuringObjectExistenceChecking,
zap.Stringer("address", prm.Address),
zap.String("error", err.Error()),
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
}
return common.ExistsRes{}, errors[len(errors)-1]
return common.ExistsRes{}, storageErrors[len(storageErrors)-1]
}

View file

@ -302,6 +302,12 @@ func (t *FSTree) Exists(ctx context.Context, prm common.ExistsPrm) (common.Exist
))
defer span.End()
select {
case <-ctx.Done():
return common.ExistsRes{}, ctx.Err()
default:
}
p := t.treePath(prm.Address)
_, err := os.Stat(p)

View file

@ -39,6 +39,12 @@ func BenchmarkExists(b *testing.B) {
b.Run("8 shards", func(b *testing.B) {
benchmarkExists(b, 8)
})
b.Run("12 shards", func(b *testing.B) {
benchmarkExists(b, 12)
})
b.Run("16 shards", func(b *testing.B) {
benchmarkExists(b, 12)
})
}
func benchmarkExists(b *testing.B, shardNum int) {

View file

@ -3,54 +3,70 @@ package engine
import (
"context"
"errors"
"sync/atomic"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"golang.org/x/sync/errgroup"
)
func (e *StorageEngine) exists(ctx context.Context, addr oid.Address) (bool, error) {
var shPrm shard.ExistsPrm
shPrm.SetAddress(addr)
alreadyRemoved := false
exists := false
var exists atomic.Bool
eg, egCtx := errgroup.WithContext(ctx)
egCtx, cancel := context.WithCancel(egCtx)
defer cancel()
e.iterateOverSortedShards(addr, func(_ int, sh hashedShard) (stop bool) {
res, err := sh.Exists(ctx, shPrm)
if err != nil {
if client.IsErrObjectAlreadyRemoved(err) {
alreadyRemoved = true
select {
case <-egCtx.Done():
return true
default:
}
eg.Go(func() error {
res, err := sh.Exists(egCtx, shPrm)
if err != nil {
if errors.Is(err, context.Canceled) {
return err
}
if client.IsErrObjectAlreadyRemoved(err) {
return err

Why not to perform cancel here too? Looks like the result will be obtained faster in this case.

Why not to perform `cancel` here too? Looks like the result will be obtained faster in this case.

Fixed, thanks

Fixed, thanks

Oh, no. It's ok. If eg.Go returns error, than egCtx will be cancelled by errgroup.

Oh, no. It's ok. If `eg.Go` returns error, than egCtx will be cancelled by errgroup.
}
var siErr *objectSDK.SplitInfoError
if errors.As(err, &siErr) {
return true
return err
}
if shard.IsErrObjectExpired(err) {
return true
return err
}
if !client.IsErrObjectNotFound(err) {
e.reportShardError(sh, "could not check existence of object in shard", err)
}
return false
return nil
}
if !exists {
exists = res.Exists()
if res.Exists() {

How is it different from Store(true)? I would expect Store to have less overhead and more obvious, calling cancel() twice is allowed too. Don't mind using your approach too.

How is it different from `Store(true)`? I would expect `Store` to have less overhead and more obvious, calling `cancel()` twice is allowed too. Don't mind using your approach too.

cancel should be called only if object found, so we have to check res.Exists(). Or I don't understand your point.

`cancel` should be called only if object found, so we have to check `res.Exists()`. Or I don't understand your point.

I mean if res.Exists() { exists.Store(true); cancel() } instead of CompareAndSwap
In my approach we have estimated number of atomic operations equal to 1.

I mean `if res.Exists() { exists.Store(true); cancel() }` instead of `CompareAndSwap` In my approach we have estimated number of atomic operations equal to 1.

fixed.
Also dropped comment about concurrent exists.

fixed. Also dropped comment about concurrent exists.
exists.Store(true)
cancel()
}
return nil
})
return false
})
if alreadyRemoved {
err := eg.Wait()
if client.IsErrObjectAlreadyRemoved(err) {
return false, new(apistatus.ObjectAlreadyRemoved)
}
return exists, nil
return exists.Load(), nil
}

View file

@ -63,8 +63,6 @@ func (e *StorageEngine) put(ctx context.Context, prm PutPrm) error {
addr := object.AddressOf(prm.obj)
// In #1146 this check was parallelized, however, it became
// much slower on fast machines for 4 shards.
_, err := e.exists(ctx, addr)
if err != nil {
return err

View file

@ -67,6 +67,12 @@ func (db *DB) Exists(ctx context.Context, prm ExistsPrm) (res ExistsRes, err err
return res, ErrDegradedMode
}
select {
fyrchik marked this conversation as resolved Outdated

Do all these select optimizations affect running speed? Asking because context expiring in Exists() is not likely, it is the first step in put, much faster than the expected operation time.

Do all these `select` optimizations affect running speed? Asking because context expiring in `Exists()` is not likely, it is the first step in put, much faster than the expected operation time.

These select are most required for blobovnicza tree.
I think it is good practice to check ctx.Done() before any IO operation.

These `select` are most required for blobovnicza tree. I think it is good practice to check `ctx.Done()` before any IO operation.
case <-ctx.Done():
return res, ctx.Err()
default:
}
currEpoch := db.epochState.CurrentEpoch()
err = db.boltDB.View(func(tx *bbolt.Tx) error {

View file

@ -47,6 +47,12 @@ func (s *Shard) Exists(ctx context.Context, prm ExistsPrm) (ExistsRes, error) {
))
defer span.End()
select {
case <-ctx.Done():
return ExistsRes{}, ctx.Err()
default:
}
var exists bool
var err error