Compare commits

...

2 commits

Author SHA1 Message Date
59fc75de0b
[#1450] engine: Make Inhume operation handle objects in parallel
Add a worker pool for `Inhume` operation and use it to handle objects
in parallel. Since metabase `Inhume` uses `bbolt.Batch`, handling many
objects one by one may be inefficient.

```
goos: linux
goarch: amd64
pkg: git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine
cpu: 12th Gen Intel(R) Core(TM) i5-1235U
                                 │    old.txt    │               new.txt                │
                                 │    sec/op     │    sec/op     vs base                │
InhumeMultipart/objects=1-12         11.45m ± 0%   11.43m ±  0%        ~ (p=0.315 n=10)
InhumeMultipart/objects=10-12       113.94m ± 1%   11.60m ±  0%  -89.82% (p=0.000 n=10)
InhumeMultipart/objects=100-12     1144.08m ± 1%   30.17m ± 12%  -97.36% (p=0.000 n=10)
InhumeMultipart/objects=1000-12    11367.5m ± 0%   265.7m ±  2%  -97.66% (p=0.000 n=10)
InhumeMultipart/objects=10000-12    113.505 ± 0%    2.436 ±  1%  -97.85% (p=0.000 n=10)
geomean                               1.140        76.32m        -93.31%
```

Signed-off-by: Aleksey Savchuk <a.savchuk@yadro.com>
2024-11-15 00:50:53 +03:00
6eb0ae3be9
[#1450] engine: Add benchmark for Inhume operation
Signed-off-by: Aleksey Savchuk <a.savchuk@yadro.com>
2024-11-15 00:15:19 +03:00
4 changed files with 158 additions and 30 deletions

View file

@ -11,6 +11,7 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
"github.com/panjf2000/ants/v2"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
)
@ -71,10 +72,14 @@ func (e *StorageEngine) open(ctx context.Context) error {
}
// Init initializes all StorageEngine's components.
func (e *StorageEngine) Init(ctx context.Context) error {
func (e *StorageEngine) Init(ctx context.Context) (err error) {
e.mtx.Lock()
defer e.mtx.Unlock()
if e.inhumePool, err = ants.NewPool(int(e.inhumePoolSize)); err != nil {
return fmt.Errorf("could not create pool: %w", err)
}
errCh := make(chan shardInitError, len(e.shards))
var eg errgroup.Group
if e.cfg.lowMem && e.anyShardRequiresRefill() {
@ -92,7 +97,7 @@ func (e *StorageEngine) Init(ctx context.Context) error {
return nil
})
}
err := eg.Wait()
err = eg.Wait()
close(errCh)
if err != nil {
return fmt.Errorf("failed to initialize shards: %w", err)
@ -161,6 +166,7 @@ func (e *StorageEngine) close(ctx context.Context, releasePools bool) error {
for _, p := range e.shardPools {
p.Release()
}
e.inhumePool.Release()
}
for id, sh := range e.shards {

View file

@ -30,6 +30,8 @@ type StorageEngine struct {
shardPools map[string]util.WorkerPool
inhumePool util.WorkerPool
closeCh chan struct{}
setModeCh chan setModeRequest
wg sync.WaitGroup
@ -191,6 +193,8 @@ type cfg struct {
shardPoolSize uint32
inhumePoolSize uint32
lowMem bool
containerSource atomic.Pointer[containerSource]
@ -198,9 +202,10 @@ type cfg struct {
func defaultCfg() *cfg {
res := &cfg{
log: logger.NewLoggerWrapper(zap.L()),
shardPoolSize: 20,
metrics: noopMetrics{},
log: logger.NewLoggerWrapper(zap.L()),
shardPoolSize: 20,
inhumePoolSize: 50,
metrics: noopMetrics{},
}
res.containerSource.Store(&containerSource{})
return res
@ -244,6 +249,13 @@ func WithShardPoolSize(sz uint32) Option {
}
}
// WithInhumePoolSize returns option to specify size of worker pool for Inhume operation.
func WithInhumePoolSize(sz uint32) Option {
return func(c *cfg) {
c.inhumePoolSize = sz
}
}
// WithErrorThreshold returns an option to specify size amount of errors after which
// shard is moved to read-only mode.
func WithErrorThreshold(sz uint32) Option {

View file

@ -3,6 +3,7 @@ package engine
import (
"context"
"errors"
"sync"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
@ -81,45 +82,93 @@ func (e *StorageEngine) Inhume(ctx context.Context, prm InhumePrm) (res InhumeRe
}
func (e *StorageEngine) inhume(ctx context.Context, prm InhumePrm) (InhumeRes, error) {
defer elapsed("Inhume", e.metrics.AddMethodDuration)()
var retErr error
wg := sync.WaitGroup{}
errOnce := sync.Once{}
ctx, cancel := context.WithCancel(ctx)
defer cancel()
var shPrm shard.InhumePrm
if prm.forceRemoval {
shPrm.ForceRemoval()
}
for i := range prm.addrs {
if !prm.forceRemoval {
locked, err := e.IsLocked(ctx, prm.addrs[i])
loop:
for _, addr := range prm.addrs {
select {
case <-ctx.Done():
break loop
default:
}
wg.Add(1)
if err := e.inhumePool.Submit(func() {
defer wg.Done()
err := e.handleInhumeTask(ctx, addr, prm.tombstone, prm.forceRemoval)
if err != nil {
e.log.Warn(ctx, logs.EngineRemovingAnObjectWithoutFullLockingCheck,
zap.Error(err),
zap.Stringer("addr", prm.addrs[i]),
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
} else if locked {
return InhumeRes{}, new(apistatus.ObjectLocked)
errOnce.Do(func() {
retErr = err
cancel()
})
}
}
}); err != nil {
wg.Done()
cancel()
wg.Wait()
if prm.tombstone != nil {
shPrm.SetTarget(*prm.tombstone, prm.addrs[i])
} else {
shPrm.MarkAsGarbage(prm.addrs[i])
e.log.Warn(ctx, logs.EngineCouldNotInhumeObjectInShard, zap.Error(err))
return InhumeRes{}, errInhumeFailure
}
}
wg.Wait()
ok, err := e.inhumeAddr(ctx, prm.addrs[i], shPrm, true)
return InhumeRes{}, retErr
}
func (e *StorageEngine) handleInhumeTask(ctx context.Context, addr oid.Address, tombstone *oid.Address, forceRemoval bool) error {
if !forceRemoval {
locked, err := e.IsLocked(ctx, addr)
if err != nil {
return InhumeRes{}, err
}
if !ok {
ok, err := e.inhumeAddr(ctx, prm.addrs[i], shPrm, false)
if err != nil {
return InhumeRes{}, err
} else if !ok {
return InhumeRes{}, errInhumeFailure
}
e.log.Warn(ctx, logs.EngineRemovingAnObjectWithoutFullLockingCheck,
zap.Error(err),
zap.Stringer("addr", addr),
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
} else if locked {
return new(apistatus.ObjectLocked)
}
}
return InhumeRes{}, nil
var prm shard.InhumePrm
if tombstone != nil {
prm.SetTarget(*tombstone, addr)
} else {
prm.MarkAsGarbage(addr)
}
if forceRemoval {
prm.ForceRemoval()
}
ok, err := e.inhumeAddr(ctx, addr, prm, true)
if err != nil {
return err
}
if !ok {
ok, err := e.inhumeAddr(ctx, addr, prm, false)
if err != nil {
return err
}
if !ok {
return errInhumeFailure
}
}
return nil
}
// Returns ok if object was inhumed during this invocation or before.

View file

@ -2,6 +2,7 @@ package engine
import (
"context"
"fmt"
"testing"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
@ -9,7 +10,11 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test"
objecttest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/test"
"github.com/stretchr/testify/require"
"golang.org/x/sync/errgroup"
)
func TestStorageEngine_Inhume(t *testing.T) {
@ -84,3 +89,59 @@ func TestStorageEngine_Inhume(t *testing.T) {
require.Empty(t, addrs)
})
}
func BenchmarkInhumeMultipart(b *testing.B) {
// The benchmark result insignificantly depends on the number of shards,
// so do not use it as a benchmark parameter, just set it big enough.
numShards := 100
for numObjects := 1; numObjects <= 10000; numObjects *= 10 {
b.Run(
fmt.Sprintf("objects=%d", numObjects),
func(b *testing.B) {
benchmarkInhumeMultipart(b, numShards, numObjects)
},
)
}
}
func benchmarkInhumeMultipart(b *testing.B, numShards, numObjects int) {
b.StopTimer()
engine := testNewEngine(b, WithShardPoolSize(uint32(numObjects))).
setShardsNum(b, numShards).prepare(b).engine
defer func() { require.NoError(b, engine.Close(context.Background())) }()
cnt := cidtest.ID()
eg := errgroup.Group{}
for range b.N {
addrs := make([]oid.Address, numObjects)
for i := range numObjects {
prm := PutPrm{}
prm.Object = objecttest.Object().Parent()
prm.Object.SetContainerID(cnt)
prm.Object.SetType(objectSDK.TypeRegular)
addrs[i] = object.AddressOf(prm.Object)
eg.Go(func() error {
return engine.Put(context.Background(), prm)
})
}
require.NoError(b, eg.Wait())
ts := oidtest.Address()
ts.SetContainer(cnt)
prm := InhumePrm{}
prm.WithTarget(ts, addrs...)
b.StartTimer()
_, err := engine.Inhume(context.Background(), prm)
require.NoError(b, err)
b.StopTimer()
}
}