engine: Check object existence concurrently #912

Merged
fyrchik merged 1 commit from dstepanov-yadro/frostfs-node:feat/engine_existance_concurrently into master 2024-01-23 07:16:33 +00:00
11 changed files with 124 additions and 35 deletions

View file

@ -21,10 +21,22 @@ func (b *Blobovnicza) Exists(ctx context.Context, addr oid.Address) (bool, error
)) ))
defer span.End() defer span.End()
select {
case <-ctx.Done():
return false, ctx.Err()
default:
}
addrKey := addressKey(addr) addrKey := addressKey(addr)
err := b.boltDB.View(func(tx *bbolt.Tx) error { err := b.boltDB.View(func(tx *bbolt.Tx) error {
return tx.ForEach(func(bucketName []byte, buck *bbolt.Bucket) error { return tx.ForEach(func(bucketName []byte, buck *bbolt.Bucket) error {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
if isNonDataBucket(bucketName) { if isNonDataBucket(bucketName) {
return nil return nil
} }

View file

@ -51,6 +51,12 @@ func (b *Blobovnicza) Get(ctx context.Context, prm GetPrm) (GetRes, error) {
)) ))
defer span.End() defer span.End()
select {
case <-ctx.Done():
return GetRes{}, ctx.Err()
default:
}
var ( var (
data []byte data []byte
addrKey = addressKey(prm.addr) addrKey = addressKey(prm.addr)
@ -58,6 +64,12 @@ func (b *Blobovnicza) Get(ctx context.Context, prm GetPrm) (GetRes, error) {
if err := b.boltDB.View(func(tx *bbolt.Tx) error { if err := b.boltDB.View(func(tx *bbolt.Tx) error {
return tx.ForEach(func(bucketName []byte, buck *bbolt.Bucket) error { return tx.ForEach(func(bucketName []byte, buck *bbolt.Bucket) error {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
if isNonDataBucket(bucketName) { if isNonDataBucket(bucketName) {
return nil return nil
} }

View file

@ -94,6 +94,11 @@ func (b *Blobovniczas) Get(ctx context.Context, prm common.GetPrm) (res common.G
// //
// returns error if object could not be read from any blobovnicza of the same level. // returns error if object could not be read from any blobovnicza of the same level.
func (b *Blobovniczas) getObjectFromLevel(ctx context.Context, prm blobovnicza.GetPrm, blzPath string) (common.GetRes, error) { func (b *Blobovniczas) getObjectFromLevel(ctx context.Context, prm blobovnicza.GetPrm, blzPath string) (common.GetRes, error) {
select {
case <-ctx.Done():
return common.GetRes{}, ctx.Err()
default:
}
// open blobovnicza (cached inside) // open blobovnicza (cached inside)
shBlz := b.getBlobovnicza(blzPath) shBlz := b.getBlobovnicza(blzPath)
blz, err := shBlz.Open() blz, err := shBlz.Open()

View file

@ -230,6 +230,12 @@ func (b *Blobovniczas) iterateSortedDBPaths(ctx context.Context, addr oid.Addres
} }
func (b *Blobovniczas) iterateSordedDBPathsInternal(ctx context.Context, path string, addr oid.Address, f func(string) (bool, error)) (bool, error) { func (b *Blobovniczas) iterateSordedDBPathsInternal(ctx context.Context, path string, addr oid.Address, f func(string) (bool, error)) (bool, error) {
select {
case <-ctx.Done():
return false, ctx.Err()
default:
}
sysPath := filepath.Join(b.rootPath, path) sysPath := filepath.Join(b.rootPath, path)
entries, err := os.ReadDir(sysPath) entries, err := os.ReadDir(sysPath)
if os.IsNotExist(err) && b.readOnly && path == "" { // non initialized tree in read only mode if os.IsNotExist(err) && b.readOnly && path == "" { // non initialized tree in read only mode
@ -252,6 +258,12 @@ func (b *Blobovniczas) iterateSordedDBPathsInternal(ctx context.Context, path st
if len(dbIdxs) > 0 { if len(dbIdxs) > 0 {
for _, dbIdx := range dbIdxs { for _, dbIdx := range dbIdxs {
select {
case <-ctx.Done():
return false, ctx.Err()
default:
}
dbPath := filepath.Join(path, u64ToHexStringExt(dbIdx)) dbPath := filepath.Join(path, u64ToHexStringExt(dbIdx))
stop, err := f(dbPath) stop, err := f(dbPath)
if err != nil { if err != nil {
@ -266,6 +278,12 @@ func (b *Blobovniczas) iterateSordedDBPathsInternal(ctx context.Context, path st
if len(dirIdxs) > 0 { if len(dirIdxs) > 0 {
hrw.SortSliceByValue(dirIdxs, addressHash(&addr, path)) hrw.SortSliceByValue(dirIdxs, addressHash(&addr, path))
for _, dirIdx := range dirIdxs { for _, dirIdx := range dirIdxs {
select {
case <-ctx.Done():
return false, ctx.Err()
default:
}
dirPath := filepath.Join(path, u64ToHexString(dirIdx)) dirPath := filepath.Join(path, u64ToHexString(dirIdx))
stop, err := b.iterateSordedDBPathsInternal(ctx, dirPath, addr, f) stop, err := b.iterateSordedDBPathsInternal(ctx, dirPath, addr, f)
if err != nil { if err != nil {

View file

@ -3,6 +3,7 @@ package blobstor
import ( import (
"context" "context"
"encoding/hex" "encoding/hex"
"errors"
"time" "time"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
@ -57,27 +58,30 @@ func (b *BlobStor) Exists(ctx context.Context, prm common.ExistsPrm) (common.Exi
// error | found | log the error, return true, nil // error | found | log the error, return true, nil
// error | not found | return the error // error | not found | return the error
// error | error | log the first error, return the second // error | error | log the first error, return the second
var errors []error var storageErrors []error
for i := range b.storage { for i := range b.storage {
res, err := b.storage[i].Storage.Exists(ctx, prm) res, err := b.storage[i].Storage.Exists(ctx, prm)
if err == nil && res.Exists { if err == nil && res.Exists {
exists = true exists = true
return res, nil return res, nil
} else if err != nil { } else if err != nil {
errors = append(errors, err) if errors.Is(err, context.Canceled) {
return common.ExistsRes{}, err
}
storageErrors = append(storageErrors, err)
} }
} }
if len(errors) == 0 { if len(storageErrors) == 0 {
return common.ExistsRes{}, nil return common.ExistsRes{}, nil
} }
for _, err := range errors[:len(errors)-1] { for _, err := range storageErrors[:len(storageErrors)-1] {
b.log.Warn(logs.BlobstorErrorOccurredDuringObjectExistenceChecking, b.log.Warn(logs.BlobstorErrorOccurredDuringObjectExistenceChecking,
zap.Stringer("address", prm.Address), zap.Stringer("address", prm.Address),
zap.String("error", err.Error()), zap.String("error", err.Error()),
zap.String("trace_id", tracingPkg.GetTraceID(ctx))) zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
} }
return common.ExistsRes{}, errors[len(errors)-1] return common.ExistsRes{}, storageErrors[len(storageErrors)-1]
} }

View file

@ -302,6 +302,12 @@ func (t *FSTree) Exists(ctx context.Context, prm common.ExistsPrm) (common.Exist
)) ))
defer span.End() defer span.End()
select {
case <-ctx.Done():
return common.ExistsRes{}, ctx.Err()
default:
}
p := t.treePath(prm.Address) p := t.treePath(prm.Address)
_, err := os.Stat(p) _, err := os.Stat(p)

View file

@ -39,6 +39,12 @@ func BenchmarkExists(b *testing.B) {
b.Run("8 shards", func(b *testing.B) { b.Run("8 shards", func(b *testing.B) {
benchmarkExists(b, 8) benchmarkExists(b, 8)
}) })
b.Run("12 shards", func(b *testing.B) {
benchmarkExists(b, 12)
})
b.Run("16 shards", func(b *testing.B) {
benchmarkExists(b, 12)
})
} }
func benchmarkExists(b *testing.B, shardNum int) { func benchmarkExists(b *testing.B, shardNum int) {

View file

@ -3,54 +3,70 @@ package engine
import ( import (
"context" "context"
"errors" "errors"
"sync/atomic"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status" apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"golang.org/x/sync/errgroup"
) )
func (e *StorageEngine) exists(ctx context.Context, addr oid.Address) (bool, error) { func (e *StorageEngine) exists(ctx context.Context, addr oid.Address) (bool, error) {
var shPrm shard.ExistsPrm var shPrm shard.ExistsPrm
shPrm.SetAddress(addr) shPrm.SetAddress(addr)
alreadyRemoved := false
exists := false var exists atomic.Bool
eg, egCtx := errgroup.WithContext(ctx)
egCtx, cancel := context.WithCancel(egCtx)
defer cancel()
e.iterateOverSortedShards(addr, func(_ int, sh hashedShard) (stop bool) { e.iterateOverSortedShards(addr, func(_ int, sh hashedShard) (stop bool) {
res, err := sh.Exists(ctx, shPrm) select {
if err != nil { case <-egCtx.Done():
if client.IsErrObjectAlreadyRemoved(err) { return true
alreadyRemoved = true default:
return true
}
var siErr *objectSDK.SplitInfoError
if errors.As(err, &siErr) {
return true
}
if shard.IsErrObjectExpired(err) {
return true
}
if !client.IsErrObjectNotFound(err) {
e.reportShardError(sh, "could not check existence of object in shard", err)
}
return false
} }
if !exists { eg.Go(func() error {
exists = res.Exists() res, err := sh.Exists(egCtx, shPrm)
} if err != nil {
if errors.Is(err, context.Canceled) {
return err
}
if client.IsErrObjectAlreadyRemoved(err) {
return err

Why not to perform cancel here too? Looks like the result will be obtained faster in this case.

Why not to perform `cancel` here too? Looks like the result will be obtained faster in this case.

Fixed, thanks

Fixed, thanks

Oh, no. It's ok. If eg.Go returns error, than egCtx will be cancelled by errgroup.

Oh, no. It's ok. If `eg.Go` returns error, than egCtx will be cancelled by errgroup.
}
var siErr *objectSDK.SplitInfoError
if errors.As(err, &siErr) {
return err
}
if shard.IsErrObjectExpired(err) {
return err
}
if !client.IsErrObjectNotFound(err) {
e.reportShardError(sh, "could not check existence of object in shard", err)
}
return nil
}
if res.Exists() {

How is it different from Store(true)? I would expect Store to have less overhead and more obvious, calling cancel() twice is allowed too. Don't mind using your approach too.

How is it different from `Store(true)`? I would expect `Store` to have less overhead and more obvious, calling `cancel()` twice is allowed too. Don't mind using your approach too.

cancel should be called only if object found, so we have to check res.Exists(). Or I don't understand your point.

`cancel` should be called only if object found, so we have to check `res.Exists()`. Or I don't understand your point.

I mean if res.Exists() { exists.Store(true); cancel() } instead of CompareAndSwap
In my approach we have estimated number of atomic operations equal to 1.

I mean `if res.Exists() { exists.Store(true); cancel() }` instead of `CompareAndSwap` In my approach we have estimated number of atomic operations equal to 1.

fixed.
Also dropped comment about concurrent exists.

fixed. Also dropped comment about concurrent exists.
exists.Store(true)
cancel()
}
return nil
})
return false return false
}) })
if alreadyRemoved { err := eg.Wait()
if client.IsErrObjectAlreadyRemoved(err) {
return false, new(apistatus.ObjectAlreadyRemoved) return false, new(apistatus.ObjectAlreadyRemoved)
} }
return exists, nil return exists.Load(), nil
} }

View file

@ -63,8 +63,6 @@ func (e *StorageEngine) put(ctx context.Context, prm PutPrm) error {
addr := object.AddressOf(prm.obj) addr := object.AddressOf(prm.obj)
// In #1146 this check was parallelized, however, it became
// much slower on fast machines for 4 shards.
_, err := e.exists(ctx, addr) _, err := e.exists(ctx, addr)
if err != nil { if err != nil {
return err return err

View file

@ -67,6 +67,12 @@ func (db *DB) Exists(ctx context.Context, prm ExistsPrm) (res ExistsRes, err err
return res, ErrDegradedMode return res, ErrDegradedMode
} }
select {
fyrchik marked this conversation as resolved Outdated

Do all these select optimizations affect running speed? Asking because context expiring in Exists() is not likely, it is the first step in put, much faster than the expected operation time.

Do all these `select` optimizations affect running speed? Asking because context expiring in `Exists()` is not likely, it is the first step in put, much faster than the expected operation time.

These select are most required for blobovnicza tree.
I think it is good practice to check ctx.Done() before any IO operation.

These `select` are most required for blobovnicza tree. I think it is good practice to check `ctx.Done()` before any IO operation.
case <-ctx.Done():
return res, ctx.Err()
default:
}
currEpoch := db.epochState.CurrentEpoch() currEpoch := db.epochState.CurrentEpoch()
err = db.boltDB.View(func(tx *bbolt.Tx) error { err = db.boltDB.View(func(tx *bbolt.Tx) error {

View file

@ -47,6 +47,12 @@ func (s *Shard) Exists(ctx context.Context, prm ExistsPrm) (ExistsRes, error) {
)) ))
defer span.End() defer span.End()
select {
case <-ctx.Done():
return ExistsRes{}, ctx.Err()
default:
}
var exists bool var exists bool
var err error var err error