[#1450] engine: Group object by shard before Inhume

```
goos: linux
goarch: amd64
pkg: git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine
cpu: 12th Gen Intel(R) Core(TM) i5-1235U
                                 │    old.txt     │               new2.txt               │
                                 │     sec/op     │    sec/op     vs base                │
InhumeMultipart/objects=1-12          11.42m ± 1%   10.73m ±  0%   -6.12% (p=0.000 n=10)
InhumeMultipart/objects=10-12        113.51m ± 0%   11.00m ±  1%  -90.31% (p=0.000 n=10)
InhumeMultipart/objects=100-12      1135.41m ± 1%   22.36m ±  1%  -98.03% (p=0.000 n=10)
InhumeMultipart/objects=1000-12    11357.77m ± 0%   41.97m ± 24%  -99.63% (p=0.000 n=10)
InhumeMultipart/objects=10000-12   113250.7m ± 0%   273.9m ±  3%  -99.76% (p=0.000 n=10)
geomean                                1.136        31.36m        -97.24%
```

Signed-off-by: Aleksey Savchuk <a.savchuk@yadro.com>
This commit is contained in:
Aleksey Savchuk 2024-11-18 14:40:10 +03:00
parent 503ebcdb9d
commit b947896005
Signed by: a-savchuk
GPG key ID: 70C0A7FF6F9C4639
2 changed files with 215 additions and 130 deletions

View file

@ -3,12 +3,12 @@ package engine
import (
"context"
"errors"
"sync"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
tracingPkg "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/tracing"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util"
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
@ -82,152 +82,182 @@ func (e *StorageEngine) Inhume(ctx context.Context, prm InhumePrm) (res InhumeRe
}
func (e *StorageEngine) inhume(ctx context.Context, prm InhumePrm) (InhumeRes, error) {
defer elapsed("Inhume", e.metrics.AddMethodDuration)()
var retErr error
wg := sync.WaitGroup{}
errOnce := sync.Once{}
ctx, cancel := context.WithCancel(ctx)
defer cancel()
loop:
for _, addr := range prm.addrs {
select {
case <-ctx.Done():
break loop
default:
}
wg.Add(1)
if err := e.inhumePool.Submit(func() {
defer wg.Done()
if err := e.handleInhumeTask(ctx, addr, prm.tombstone, prm.forceRemoval); err != nil {
errOnce.Do(func() {
retErr = err
cancel()
})
}
}); err != nil {
wg.Done()
cancel()
wg.Wait()
e.log.Warn(ctx, logs.EngineCouldNotInhumeObjectInShard, zap.Error(err))
return InhumeRes{}, errInhumeFailure
}
}
wg.Wait()
return InhumeRes{}, retErr
}
func (e *StorageEngine) handleInhumeTask(ctx context.Context, addr oid.Address, tombstone *oid.Address, forceRemoval bool) error {
if !forceRemoval {
locked, err := e.IsLocked(ctx, addr)
if err != nil {
e.log.Warn(ctx, logs.EngineRemovingAnObjectWithoutFullLockingCheck,
zap.Error(err),
zap.Stringer("addr", addr),
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
} else if locked {
return new(apistatus.ObjectLocked)
}
}
var prm shard.InhumePrm
if tombstone != nil {
prm.SetTarget(*tombstone, addr)
} else {
prm.MarkAsGarbage(addr)
}
if forceRemoval {
prm.ForceRemoval()
}
ok, err := e.inhumeAddr(ctx, addr, prm, true)
addrsPerShard, err := e.groupObjectsBeforeInhume(ctx, prm.addrs, !prm.forceRemoval)
if err != nil {
return err
}
if !ok {
ok, err := e.inhumeAddr(ctx, addr, prm, false)
if err != nil {
return err
}
if !ok {
return errInhumeFailure
}
return InhumeRes{}, err
}
return nil
}
// Returns ok if object was inhumed during this invocation or before.
func (e *StorageEngine) inhumeAddr(ctx context.Context, addr oid.Address, prm shard.InhumePrm, checkExists bool) (bool, error) {
root := false
var existPrm shard.ExistsPrm
var retErr error
var ok bool
e.iterateOverSortedShards(addr, func(_ int, sh hashedShard) (stop bool) {
defer func() {
// if object is root we continue since information about it
// can be presented in other shards
if checkExists && root {
stop = false
tasks := make([]util.WorkerTask, 0, len(addrsPerShard))
for shardID, addrIndexes := range addrsPerShard {
tasks = append(tasks, func(ctx context.Context) error {
addr := make([]oid.Address, len(addrIndexes))
for i, index := range addrIndexes {
addr[i] = prm.addrs[index]
}
}()
if checkExists {
existPrm.Address = addr
exRes, err := sh.Exists(ctx, existPrm)
if err != nil {
if client.IsErrObjectAlreadyRemoved(err) || shard.IsErrObjectExpired(err) {
// inhumed once - no need to be inhumed again
ok = true
return true
}
var siErr *objectSDK.SplitInfoError
var ecErr *objectSDK.ECInfoError
if !(errors.As(err, &siErr) || errors.As(err, &ecErr)) {
e.reportShardError(ctx, sh, "could not check for presents in shard", err, zap.Stringer("address", addr))
return
}
root = true
} else if !exRes.Exists() {
return
var inhumePrm shard.InhumePrm
if prm.tombstone != nil {
inhumePrm.SetTarget(*prm.tombstone, addr...)
} else {
inhumePrm.MarkAsGarbage(addr...)
}
if prm.forceRemoval {
inhumePrm.ForceRemoval()
}
}
_, err := sh.Inhume(ctx, prm)
if err != nil {
hs, exists := e.shards[shardID]
if !exists {
e.log.Warn(ctx, logs.EngineCouldNotInhumeObjectInShard,
zap.Error(errors.New("this shard was expected to exist")),
zap.String("shard_id", shardID),
zap.String("trace_id", tracingPkg.GetTraceID(ctx)),
)
return errInhumeFailure
}
_, err := hs.Shard.Inhume(ctx, inhumePrm)
var errLocked *apistatus.ObjectLocked
switch {
case errors.As(err, &errLocked):
retErr = new(apistatus.ObjectLocked)
return true
err = errLocked
case errors.Is(err, shard.ErrLockObjectRemoval):
retErr = meta.ErrLockObjectRemoval
return true
case errors.Is(err, shard.ErrReadOnlyMode) || errors.Is(err, shard.ErrDegradedMode):
retErr = err
return true
case errors.Is(err, shard.ErrReadOnlyMode):
case errors.Is(err, shard.ErrDegradedMode):
case err != nil:
e.reportShardError(ctx, hs, "couldn't inhume object in shard", err)
}
return err
})
}
e.reportShardError(ctx, sh, "could not inhume object in shard", err, zap.Stringer("address", addr))
return false
err = util.ExecuteWithWorkerPool(ctx, e.inhumePool, tasks)
var errSubmit *util.WorkerPoolSubmitError
if errors.As(err, &errSubmit) {
e.log.Warn(ctx, logs.EngineCouldNotInhumeObjectInShard, zap.Error(err))
err = errInhumeFailure
}
return InhumeRes{}, err
}
// groupObjectsBeforeInhume groups objects by shard they're stored on.
func (e *StorageEngine) groupObjectsBeforeInhume(ctx context.Context, addrs []oid.Address, checkLocked bool) (map[string][]int, error) {
type addrLocation struct {
addrIndex int
shardID string
}
locations := make(chan addrLocation, e.inhumePoolSize)
groups := make(map[string][]int)
groupingDone := make(chan struct{})
go func() {
defer close(groupingDone)
for loc := range locations {
groups[loc.shardID] = append(groups[loc.shardID], loc.addrIndex)
}
}()
tasks := make([]util.WorkerTask, len(addrs))
for i, addr := range addrs {
tasks[i] = func(ctx context.Context) (err error) {
ids, err := e.findShards(ctx, addr, checkLocked)
if err != nil {
return err
}
for _, id := range ids {
locations <- addrLocation{addrIndex: i, shardID: id}
}
return nil
}
}
err := util.ExecuteWithWorkerPool(ctx, e.inhumePool, tasks)
close(locations)
<-groupingDone
if err != nil {
var errSubmit *util.WorkerPoolSubmitError
if errors.As(err, &errSubmit) {
e.log.Warn(ctx, logs.EngineCouldNotInhumeObjectInShard, zap.Error(err))
return nil, errInhumeFailure
}
return nil, err
}
return groups, nil
}
// findShards returns a shard which this object is stored on,
// and returns multiple shards if it's a root object.
func (e *StorageEngine) findShards(ctx context.Context, addr oid.Address, checkLocked bool) ([]string, error) {
var (
ids []string
retErr error
prm shard.ExistsPrm
siErr *objectSDK.SplitInfoError
ecErr *objectSDK.ECInfoError
isRootObject = false
)
e.iterateOverSortedShards(addr, func(_ int, hs hashedShard) (stop bool) {
objectExists := false
prm.Address = addr
switch res, err := hs.Shard.Exists(ctx, prm); {
case client.IsErrObjectAlreadyRemoved(err) || shard.IsErrObjectExpired(err):
// Don't report the error and just keep going.
case errors.As(err, &siErr) || errors.As(err, &ecErr):
isRootObject = true
objectExists = true
case err != nil:
e.reportShardError(
ctx, hs, "couldn't check for presence in shard",
err, zap.Stringer("address", addr),
)
case res.Exists():
objectExists = true
default:
}
ok = true
return true
if !objectExists {
return
}
// TODO(@a-savchuk): `shard.(*Shard).Exists` can check if an object
// is locked. I wanted to use it here, but then I found out that it
// checks if the PARENT object is locked, not the object passed.
// This check is used in one place only, in `(*StorageEngine).Put`.
//
// In the future, if `shard.(*Shard).Exists` is updated to check
// the lock status of the specific object, we could use it instead of
// `shard.(*Shard).IsLocked`.
if checkLocked {
if isLocked, err := hs.Shard.IsLocked(ctx, addr); err != nil {
e.log.Warn(ctx, logs.EngineRemovingAnObjectWithoutFullLockingCheck,
zap.Error(err),
zap.Stringer("address", addr),
zap.String("trace_id", tracingPkg.GetTraceID(ctx)),
)
} else if isLocked {
retErr = new(apistatus.ObjectLocked)
return true
}
}
ids = append(ids, hs.ID().String())
// Continue if it's a root object.
return !isRootObject
})
return ok, retErr
if retErr != nil {
return nil, retErr
}
return ids, nil
}
// IsLocked checks whether an object is locked according to StorageEngine's state.

View file

@ -1,6 +1,8 @@
package util
import (
"context"
"sync"
"sync/atomic"
"github.com/panjf2000/ants/v2"
@ -53,3 +55,56 @@ func (p *pseudoWorkerPool) Submit(fn func()) error {
func (p *pseudoWorkerPool) Release() {
p.closed.Store(true)
}
type WorkerTask func(ctx context.Context) error
type WorkerPoolSubmitError struct {
err error
}
func (e *WorkerPoolSubmitError) Error() string {
return e.err.Error()
}
func (e *WorkerPoolSubmitError) Unwrap() error {
return e.err
}
// ExecuteWithWorkerPool runs tasks in parallel using a pool and waits for all
// tasks to be complete.
//
// Returns [WorkerPoolSubmitError] when it couldn't submit a task.
func ExecuteWithWorkerPool(ctx context.Context, pool WorkerPool, tasks []WorkerTask) error {
taskCtx, taskCancel := context.WithCancelCause(ctx)
defer taskCancel(nil)
var wg sync.WaitGroup
loop:
for _, task := range tasks {
select {
case <-ctx.Done():
taskCancel(context.Cause(ctx))
break loop
default:
}
wg.Add(1)
if err := pool.Submit(func() {
defer wg.Done()
if err := task(taskCtx); err != nil {
taskCancel(err)
}
}); err != nil {
wg.Done()
taskCancel(err)
wg.Wait()
return &WorkerPoolSubmitError{err}
}
}
wg.Wait()
return context.Cause(taskCtx)
}