forked from TrueCloudLab/frostfs-node
Compare commits
2 commits
59fc75de0b
...
b19cf31440
Author | SHA1 | Date | |
---|---|---|---|
b19cf31440 | |||
b62919f595 |
4 changed files with 157 additions and 30 deletions
|
@ -11,6 +11,7 @@ import (
|
|||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
|
||||
"github.com/panjf2000/ants/v2"
|
||||
"go.uber.org/zap"
|
||||
"golang.org/x/sync/errgroup"
|
||||
)
|
||||
|
@ -71,10 +72,14 @@ func (e *StorageEngine) open(ctx context.Context) error {
|
|||
}
|
||||
|
||||
// Init initializes all StorageEngine's components.
|
||||
func (e *StorageEngine) Init(ctx context.Context) error {
|
||||
func (e *StorageEngine) Init(ctx context.Context) (err error) {
|
||||
e.mtx.Lock()
|
||||
defer e.mtx.Unlock()
|
||||
|
||||
if e.inhumePool, err = ants.NewPool(int(e.inhumePoolSize)); err != nil {
|
||||
return fmt.Errorf("could not create pool: %w", err)
|
||||
}
|
||||
|
||||
errCh := make(chan shardInitError, len(e.shards))
|
||||
var eg errgroup.Group
|
||||
if e.cfg.lowMem && e.anyShardRequiresRefill() {
|
||||
|
@ -92,7 +97,7 @@ func (e *StorageEngine) Init(ctx context.Context) error {
|
|||
return nil
|
||||
})
|
||||
}
|
||||
err := eg.Wait()
|
||||
err = eg.Wait()
|
||||
close(errCh)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to initialize shards: %w", err)
|
||||
|
@ -161,6 +166,7 @@ func (e *StorageEngine) close(ctx context.Context, releasePools bool) error {
|
|||
for _, p := range e.shardPools {
|
||||
p.Release()
|
||||
}
|
||||
e.inhumePool.Release()
|
||||
}
|
||||
|
||||
for id, sh := range e.shards {
|
||||
|
|
|
@ -30,6 +30,8 @@ type StorageEngine struct {
|
|||
|
||||
shardPools map[string]util.WorkerPool
|
||||
|
||||
inhumePool util.WorkerPool
|
||||
|
||||
closeCh chan struct{}
|
||||
setModeCh chan setModeRequest
|
||||
wg sync.WaitGroup
|
||||
|
@ -191,6 +193,8 @@ type cfg struct {
|
|||
|
||||
shardPoolSize uint32
|
||||
|
||||
inhumePoolSize uint32
|
||||
|
||||
lowMem bool
|
||||
|
||||
containerSource atomic.Pointer[containerSource]
|
||||
|
@ -198,9 +202,10 @@ type cfg struct {
|
|||
|
||||
func defaultCfg() *cfg {
|
||||
res := &cfg{
|
||||
log: logger.NewLoggerWrapper(zap.L()),
|
||||
shardPoolSize: 20,
|
||||
metrics: noopMetrics{},
|
||||
log: logger.NewLoggerWrapper(zap.L()),
|
||||
shardPoolSize: 20,
|
||||
inhumePoolSize: 50,
|
||||
metrics: noopMetrics{},
|
||||
}
|
||||
res.containerSource.Store(&containerSource{})
|
||||
return res
|
||||
|
@ -244,6 +249,13 @@ func WithShardPoolSize(sz uint32) Option {
|
|||
}
|
||||
}
|
||||
|
||||
// WithInhumePoolSize returns option to specify size of worker pool for Inhume operation.
|
||||
func WithInhumePoolSize(sz uint32) Option {
|
||||
return func(c *cfg) {
|
||||
c.inhumePoolSize = sz
|
||||
}
|
||||
}
|
||||
|
||||
// WithErrorThreshold returns an option to specify size amount of errors after which
|
||||
// shard is moved to read-only mode.
|
||||
func WithErrorThreshold(sz uint32) Option {
|
||||
|
|
|
@ -3,6 +3,7 @@ package engine
|
|||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"sync"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
|
||||
|
@ -81,45 +82,93 @@ func (e *StorageEngine) Inhume(ctx context.Context, prm InhumePrm) (res InhumeRe
|
|||
}
|
||||
|
||||
func (e *StorageEngine) inhume(ctx context.Context, prm InhumePrm) (InhumeRes, error) {
|
||||
defer elapsed("Inhume", e.metrics.AddMethodDuration)()
|
||||
|
||||
var retErr error
|
||||
|
||||
wg := sync.WaitGroup{}
|
||||
errOnce := sync.Once{}
|
||||
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
defer cancel()
|
||||
|
||||
var shPrm shard.InhumePrm
|
||||
if prm.forceRemoval {
|
||||
shPrm.ForceRemoval()
|
||||
}
|
||||
|
||||
for i := range prm.addrs {
|
||||
if !prm.forceRemoval {
|
||||
locked, err := e.IsLocked(ctx, prm.addrs[i])
|
||||
loop:
|
||||
for _, addr := range prm.addrs {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
break loop
|
||||
default:
|
||||
}
|
||||
|
||||
wg.Add(1)
|
||||
if err := e.inhumePool.Submit(func() {
|
||||
defer wg.Done()
|
||||
|
||||
err := e.handleInhumeTask(ctx, addr, prm.tombstone, prm.forceRemoval)
|
||||
if err != nil {
|
||||
e.log.Warn(ctx, logs.EngineRemovingAnObjectWithoutFullLockingCheck,
|
||||
zap.Error(err),
|
||||
zap.Stringer("addr", prm.addrs[i]),
|
||||
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
|
||||
} else if locked {
|
||||
return InhumeRes{}, new(apistatus.ObjectLocked)
|
||||
errOnce.Do(func() {
|
||||
retErr = err
|
||||
cancel()
|
||||
})
|
||||
}
|
||||
}
|
||||
}); err != nil {
|
||||
wg.Done()
|
||||
cancel()
|
||||
wg.Wait()
|
||||
|
||||
if prm.tombstone != nil {
|
||||
shPrm.SetTarget(*prm.tombstone, prm.addrs[i])
|
||||
} else {
|
||||
shPrm.MarkAsGarbage(prm.addrs[i])
|
||||
e.log.Warn(ctx, logs.EngineCouldNotInhumeObjectInShard, zap.Error(err))
|
||||
return InhumeRes{}, errInhumeFailure
|
||||
}
|
||||
}
|
||||
wg.Wait()
|
||||
|
||||
ok, err := e.inhumeAddr(ctx, prm.addrs[i], shPrm, true)
|
||||
return InhumeRes{}, retErr
|
||||
}
|
||||
|
||||
func (e *StorageEngine) handleInhumeTask(ctx context.Context, addr oid.Address, tombstone *oid.Address, forceRemoval bool) error {
|
||||
if !forceRemoval {
|
||||
locked, err := e.IsLocked(ctx, addr)
|
||||
if err != nil {
|
||||
return InhumeRes{}, err
|
||||
}
|
||||
if !ok {
|
||||
ok, err := e.inhumeAddr(ctx, prm.addrs[i], shPrm, false)
|
||||
if err != nil {
|
||||
return InhumeRes{}, err
|
||||
} else if !ok {
|
||||
return InhumeRes{}, errInhumeFailure
|
||||
}
|
||||
e.log.Warn(ctx, logs.EngineRemovingAnObjectWithoutFullLockingCheck,
|
||||
zap.Error(err),
|
||||
zap.Stringer("addr", addr),
|
||||
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
|
||||
} else if locked {
|
||||
return new(apistatus.ObjectLocked)
|
||||
}
|
||||
}
|
||||
|
||||
return InhumeRes{}, nil
|
||||
var prm shard.InhumePrm
|
||||
|
||||
if tombstone != nil {
|
||||
prm.SetTarget(*tombstone, addr)
|
||||
} else {
|
||||
prm.MarkAsGarbage(addr)
|
||||
}
|
||||
if forceRemoval {
|
||||
prm.ForceRemoval()
|
||||
}
|
||||
|
||||
ok, err := e.inhumeAddr(ctx, addr, prm, true)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if !ok {
|
||||
ok, err := e.inhumeAddr(ctx, addr, prm, false)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if !ok {
|
||||
return errInhumeFailure
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Returns ok if object was inhumed during this invocation or before.
|
||||
|
|
|
@ -2,6 +2,7 @@ package engine
|
|||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"testing"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
||||
|
@ -9,7 +10,11 @@ import (
|
|||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
|
||||
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
|
||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test"
|
||||
objecttest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/test"
|
||||
"github.com/stretchr/testify/require"
|
||||
"golang.org/x/sync/errgroup"
|
||||
)
|
||||
|
||||
func TestStorageEngine_Inhume(t *testing.T) {
|
||||
|
@ -84,3 +89,58 @@ func TestStorageEngine_Inhume(t *testing.T) {
|
|||
require.Empty(t, addrs)
|
||||
})
|
||||
}
|
||||
|
||||
func BenchmarkInhumeMultipart(b *testing.B) {
|
||||
// The benchmark result insignificantly depends on the number of shards,
|
||||
// so doesn't use this as a benchmark parameter, just set it big enough.
|
||||
numShards := 100
|
||||
|
||||
for numObjects := 1; numObjects <= 10000; numObjects *= 10 {
|
||||
b.Run(fmt.Sprintf("%d_objects", numObjects), func(b *testing.B) {
|
||||
benchmarkInhumeMultipart(b, numShards, numObjects)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func benchmarkInhumeMultipart(b *testing.B, numShards, numObjects int) {
|
||||
b.StopTimer()
|
||||
|
||||
for range b.N {
|
||||
engine := testNewEngine(b, WithShardPoolSize(uint32(numObjects))).
|
||||
setShardsNum(b, numShards).
|
||||
prepare(b).
|
||||
engine
|
||||
|
||||
cnt := cidtest.ID()
|
||||
var addrs []oid.Address
|
||||
|
||||
eg := errgroup.Group{}
|
||||
|
||||
for range numObjects {
|
||||
prm := PutPrm{}
|
||||
|
||||
prm.Object = objecttest.Object().Parent()
|
||||
prm.Object.SetContainerID(cnt)
|
||||
prm.Object.SetType(objectSDK.TypeRegular)
|
||||
addrs = append(addrs, object.AddressOf(prm.Object))
|
||||
|
||||
eg.Go(func() error {
|
||||
return engine.Put(context.Background(), prm)
|
||||
})
|
||||
}
|
||||
require.NoError(b, eg.Wait())
|
||||
|
||||
ts := oidtest.Address()
|
||||
ts.SetContainer(cnt)
|
||||
|
||||
prm := InhumePrm{}
|
||||
prm.WithTarget(ts, addrs...)
|
||||
|
||||
b.StartTimer()
|
||||
_, err := engine.Inhume(context.Background(), prm)
|
||||
require.NoError(b, err)
|
||||
b.StopTimer()
|
||||
|
||||
require.NoError(b, engine.Close(context.Background()))
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue