[#1667] shard: Drop shard pool

After adding an ops limiter, shard's `put` pool is redundant.

Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
This commit is contained in:
Dmitrii Stepanov 2025-03-10 13:04:39 +03:00
parent 597bce7a87
commit 2005fdda09
Signed by: dstepanov-yadro
GPG key ID: 237AF1A763293BC0
20 changed files with 71 additions and 171 deletions

View file

@ -153,16 +153,10 @@ func (e *StorageEngine) Close(ctx context.Context) error {
}
// closes all shards. Never returns an error, shard errors are logged.
func (e *StorageEngine) close(ctx context.Context, releasePools bool) error {
func (e *StorageEngine) close(ctx context.Context) error {
e.mtx.RLock()
defer e.mtx.RUnlock()
if releasePools {
for _, p := range e.shardPools {
p.Release()
}
}
for id, sh := range e.shards {
if err := sh.Close(ctx); err != nil {
e.log.Debug(ctx, logs.EngineCouldNotCloseShard,
@ -213,7 +207,7 @@ func (e *StorageEngine) setBlockExecErr(ctx context.Context, err error) error {
return e.open(ctx)
}
} else if prevErr == nil { // ok -> block
return e.close(ctx, errors.Is(err, errClosed))
return e.close(ctx)
}
// otherwise do nothing

View file

@ -245,7 +245,6 @@ func TestReload(t *testing.T) {
// no new paths => no new shards
require.Equal(t, shardNum, len(e.shards))
require.Equal(t, shardNum, len(e.shardPools))
newMeta := filepath.Join(addPath, fmt.Sprintf("%d.metabase", shardNum))
@ -257,7 +256,6 @@ func TestReload(t *testing.T) {
require.NoError(t, e.Reload(context.Background(), rcfg))
require.Equal(t, shardNum+1, len(e.shards))
require.Equal(t, shardNum+1, len(e.shardPools))
require.NoError(t, e.Close(context.Background()))
})
@ -277,7 +275,6 @@ func TestReload(t *testing.T) {
// removed one
require.Equal(t, shardNum-1, len(e.shards))
require.Equal(t, shardNum-1, len(e.shardPools))
require.NoError(t, e.Close(context.Background()))
})
@ -311,7 +308,6 @@ func engineWithShards(t *testing.T, path string, num int) (*StorageEngine, []str
}
require.Equal(t, num, len(e.shards))
require.Equal(t, num, len(e.shardPools))
return e, currShards
}

View file

@ -12,7 +12,6 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
@ -29,8 +28,6 @@ type StorageEngine struct {
shards map[string]hashedShard
shardPools map[string]util.WorkerPool
closeCh chan struct{}
setModeCh chan setModeRequest
wg sync.WaitGroup
@ -193,8 +190,6 @@ type cfg struct {
metrics MetricRegister
shardPoolSize uint32
lowMem bool
containerSource atomic.Pointer[containerSource]
@ -202,9 +197,8 @@ type cfg struct {
func defaultCfg() *cfg {
res := &cfg{
log: logger.NewLoggerWrapper(zap.L()),
shardPoolSize: 20,
metrics: noopMetrics{},
log: logger.NewLoggerWrapper(zap.L()),
metrics: noopMetrics{},
}
res.containerSource.Store(&containerSource{})
return res
@ -221,7 +215,6 @@ func New(opts ...Option) *StorageEngine {
return &StorageEngine{
cfg: c,
shards: make(map[string]hashedShard),
shardPools: make(map[string]util.WorkerPool),
closeCh: make(chan struct{}),
setModeCh: make(chan setModeRequest),
evacuateLimiter: &evacuationLimiter{},
@ -241,13 +234,6 @@ func WithMetrics(v MetricRegister) Option {
}
}
// WithShardPoolSize returns option to specify size of worker pool for each shard.
func WithShardPoolSize(sz uint32) Option {
return func(c *cfg) {
c.shardPoolSize = sz
}
}
// WithErrorThreshold returns an option to specify size amount of errors after which
// shard is moved to read-only mode.
func WithErrorThreshold(sz uint32) Option {

View file

@ -57,7 +57,6 @@ func (te *testEngineWrapper) setShardsNumOpts(
te.shardIDs[i] = shard.ID()
}
require.Len(t, te.engine.shards, num)
require.Len(t, te.engine.shardPools, num)
return te
}

View file

@ -46,7 +46,6 @@ func newEngineWithErrorThreshold(t testing.TB, dir string, errThreshold uint32)
var testShards [2]*testShard
te := testNewEngine(t,
WithShardPoolSize(1),
WithErrorThreshold(errThreshold),
).
setShardsNumOpts(t, 2, func(id int) []shard.Option {

View file

@ -15,7 +15,6 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/pilorama"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util"
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
containerSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
@ -201,11 +200,6 @@ func (p *EvacuateShardRes) DeepCopy() *EvacuateShardRes {
return res
}
type pooledShard struct {
hashedShard
pool util.WorkerPool
}
var errMustHaveTwoShards = errors.New("must have at least 1 spare shard")
// Evacuate moves data from one shard to the others.
@ -252,7 +246,7 @@ func (e *StorageEngine) Evacuate(ctx context.Context, prm EvacuateShardPrm) erro
}
var mtx sync.RWMutex
copyShards := func() []pooledShard {
copyShards := func() []hashedShard {
mtx.RLock()
defer mtx.RUnlock()
t := slices.Clone(shards)
@ -266,7 +260,7 @@ func (e *StorageEngine) Evacuate(ctx context.Context, prm EvacuateShardPrm) erro
}
func (e *StorageEngine) evacuateShards(ctx context.Context, shardIDs []string, prm EvacuateShardPrm, res *EvacuateShardRes,
shards func() []pooledShard, shardsToEvacuate map[string]*shard.Shard,
shards func() []hashedShard, shardsToEvacuate map[string]*shard.Shard,
) error {
var err error
ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.evacuateShards",
@ -388,7 +382,7 @@ func (e *StorageEngine) getTotals(ctx context.Context, prm EvacuateShardPrm, sha
}
func (e *StorageEngine) evacuateShard(ctx context.Context, cancel context.CancelCauseFunc, shardID string, prm EvacuateShardPrm, res *EvacuateShardRes,
shards func() []pooledShard, shardsToEvacuate map[string]*shard.Shard,
shards func() []hashedShard, shardsToEvacuate map[string]*shard.Shard,
egContainer *errgroup.Group, egObject *errgroup.Group,
) error {
ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.evacuateShard",
@ -412,7 +406,7 @@ func (e *StorageEngine) evacuateShard(ctx context.Context, cancel context.Cancel
}
func (e *StorageEngine) evacuateShardObjects(ctx context.Context, cancel context.CancelCauseFunc, shardID string, prm EvacuateShardPrm, res *EvacuateShardRes,
shards func() []pooledShard, shardsToEvacuate map[string]*shard.Shard,
shards func() []hashedShard, shardsToEvacuate map[string]*shard.Shard,
egContainer *errgroup.Group, egObject *errgroup.Group,
) error {
sh := shardsToEvacuate[shardID]
@ -485,7 +479,7 @@ func (e *StorageEngine) evacuateShardObjects(ctx context.Context, cancel context
}
func (e *StorageEngine) evacuateShardTrees(ctx context.Context, shardID string, prm EvacuateShardPrm, res *EvacuateShardRes,
getShards func() []pooledShard, shardsToEvacuate map[string]*shard.Shard,
getShards func() []hashedShard, shardsToEvacuate map[string]*shard.Shard,
) error {
sh := shardsToEvacuate[shardID]
shards := getShards()
@ -515,7 +509,7 @@ func (e *StorageEngine) evacuateShardTrees(ctx context.Context, shardID string,
}
func (e *StorageEngine) evacuateTrees(ctx context.Context, sh *shard.Shard, trees []pilorama.ContainerIDTreeID,
prm EvacuateShardPrm, res *EvacuateShardRes, shards []pooledShard, shardsToEvacuate map[string]*shard.Shard,
prm EvacuateShardPrm, res *EvacuateShardRes, shards []hashedShard, shardsToEvacuate map[string]*shard.Shard,
) error {
ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.evacuateTrees",
trace.WithAttributes(
@ -583,7 +577,7 @@ func (e *StorageEngine) evacuateTreeToOtherNode(ctx context.Context, sh *shard.S
}
func (e *StorageEngine) tryEvacuateTreeLocal(ctx context.Context, sh *shard.Shard, tree pilorama.ContainerIDTreeID,
prm EvacuateShardPrm, shards []pooledShard, shardsToEvacuate map[string]*shard.Shard,
prm EvacuateShardPrm, shards []hashedShard, shardsToEvacuate map[string]*shard.Shard,
) (bool, string, error) {
target, found, err := e.findShardToEvacuateTree(ctx, tree, shards, shardsToEvacuate)
if err != nil {
@ -653,15 +647,15 @@ func (e *StorageEngine) tryEvacuateTreeLocal(ctx context.Context, sh *shard.Shar
// findShardToEvacuateTree returns first shard according HRW or first shard with tree exists.
func (e *StorageEngine) findShardToEvacuateTree(ctx context.Context, tree pilorama.ContainerIDTreeID,
shards []pooledShard, shardsToEvacuate map[string]*shard.Shard,
) (pooledShard, bool, error) {
shards []hashedShard, shardsToEvacuate map[string]*shard.Shard,
) (hashedShard, bool, error) {
hrw.SortHasherSliceByValue(shards, hrw.StringHash(tree.CID.EncodeToString()))
var result pooledShard
var result hashedShard
var found bool
for _, target := range shards {
select {
case <-ctx.Done():
return pooledShard{}, false, ctx.Err()
return hashedShard{}, false, ctx.Err()
default:
}
@ -689,7 +683,7 @@ func (e *StorageEngine) findShardToEvacuateTree(ctx context.Context, tree pilora
return result, found, nil
}
func (e *StorageEngine) getActualShards(shardIDs []string, prm EvacuateShardPrm) ([]pooledShard, error) {
func (e *StorageEngine) getActualShards(shardIDs []string, prm EvacuateShardPrm) ([]hashedShard, error) {
e.mtx.RLock()
defer e.mtx.RUnlock()
@ -719,18 +713,15 @@ func (e *StorageEngine) getActualShards(shardIDs []string, prm EvacuateShardPrm)
// We must have all shards, to have correct information about their
// indexes in a sorted slice and set appropriate marks in the metabase.
// Evacuated shard is skipped during put.
shards := make([]pooledShard, 0, len(e.shards))
shards := make([]hashedShard, 0, len(e.shards))
for id := range e.shards {
shards = append(shards, pooledShard{
hashedShard: e.shards[id],
pool: e.shardPools[id],
})
shards = append(shards, e.shards[id])
}
return shards, nil
}
func (e *StorageEngine) evacuateObject(ctx context.Context, shardID string, objInfo *object.Info, prm EvacuateShardPrm, res *EvacuateShardRes,
getShards func() []pooledShard, shardsToEvacuate map[string]*shard.Shard, cnr containerSDK.Container,
getShards func() []hashedShard, shardsToEvacuate map[string]*shard.Shard, cnr containerSDK.Container,
) error {
ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.evacuateObjects")
defer span.End()
@ -800,7 +791,7 @@ func (e *StorageEngine) isNotRepOne(c *container.Container) bool {
}
func (e *StorageEngine) tryEvacuateObjectLocal(ctx context.Context, addr oid.Address, object *objectSDK.Object, sh *shard.Shard,
shards []pooledShard, shardsToEvacuate map[string]*shard.Shard, res *EvacuateShardRes, cnr containerSDK.Container,
shards []hashedShard, shardsToEvacuate map[string]*shard.Shard, res *EvacuateShardRes, cnr containerSDK.Container,
) (bool, error) {
hrw.SortHasherSliceByValue(shards, hrw.StringHash(addr.EncodeToString()))
for j := range shards {
@ -813,7 +804,7 @@ func (e *StorageEngine) tryEvacuateObjectLocal(ctx context.Context, addr oid.Add
if _, ok := shardsToEvacuate[shards[j].ID().String()]; ok {
continue
}
switch e.putToShard(ctx, shards[j].hashedShard, shards[j].pool, addr, object, container.IsIndexedContainer(cnr)).status {
switch e.putToShard(ctx, shards[j], addr, object, container.IsIndexedContainer(cnr)).status {
case putToShardSuccess:
res.objEvacuated.Add(1)
e.log.Debug(ctx, logs.EngineObjectIsMovedToAnotherShard,

View file

@ -196,7 +196,6 @@ func TestEvacuateShardObjects(t *testing.T) {
e.mtx.Lock()
delete(e.shards, evacuateShardID)
delete(e.shardPools, evacuateShardID)
e.mtx.Unlock()
checkHasObjects(t)

View file

@ -205,7 +205,7 @@ func BenchmarkInhumeMultipart(b *testing.B) {
func benchmarkInhumeMultipart(b *testing.B, numShards, numObjects int) {
b.StopTimer()
engine := testNewEngine(b, WithShardPoolSize(uint32(numObjects))).
engine := testNewEngine(b).
setShardsNum(b, numShards).prepare(b).engine
defer func() { require.NoError(b, engine.Close(context.Background())) }()

View file

@ -9,7 +9,6 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util"
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
@ -99,13 +98,13 @@ func (e *StorageEngine) put(ctx context.Context, prm PutPrm) error {
var shRes putToShardRes
e.iterateOverSortedShards(addr, func(_ int, sh hashedShard) (stop bool) {
e.mtx.RLock()
pool, ok := e.shardPools[sh.ID().String()]
_, ok := e.shards[sh.ID().String()]
e.mtx.RUnlock()
if !ok {
// Shard was concurrently removed, skip.
return false
}
shRes = e.putToShard(ctx, sh, pool, addr, prm.Object, prm.IsIndexedContainer)
shRes = e.putToShard(ctx, sh, addr, prm.Object, prm.IsIndexedContainer)
return shRes.status != putToShardUnknown
})
switch shRes.status {
@ -122,70 +121,59 @@ func (e *StorageEngine) put(ctx context.Context, prm PutPrm) error {
// putToShard puts object to sh.
// Return putToShardStatus and error if it is necessary to propagate an error upper.
func (e *StorageEngine) putToShard(ctx context.Context, sh hashedShard, pool util.WorkerPool,
func (e *StorageEngine) putToShard(ctx context.Context, sh hashedShard,
addr oid.Address, obj *objectSDK.Object, isIndexedContainer bool,
) (res putToShardRes) {
exitCh := make(chan struct{})
var existPrm shard.ExistsPrm
existPrm.Address = addr
if err := pool.Submit(func() {
defer close(exitCh)
var existPrm shard.ExistsPrm
existPrm.Address = addr
exists, err := sh.Exists(ctx, existPrm)
if err != nil {
if shard.IsErrObjectExpired(err) {
// object is already found but
// expired => do nothing with it
res.status = putToShardExists
} else {
e.log.Warn(ctx, logs.EngineCouldNotCheckObjectExistence,
zap.Stringer("shard_id", sh.ID()),
zap.Error(err))
}
return // this is not ErrAlreadyRemoved error so we can go to the next shard
}
if exists.Exists() {
exists, err := sh.Exists(ctx, existPrm)
if err != nil {
if shard.IsErrObjectExpired(err) {
// object is already found but
// expired => do nothing with it
res.status = putToShardExists
return
} else {
e.log.Warn(ctx, logs.EngineCouldNotCheckObjectExistence,
zap.Stringer("shard_id", sh.ID()),
zap.Error(err))
}
var putPrm shard.PutPrm
putPrm.SetObject(obj)
putPrm.SetIndexAttributes(isIndexedContainer)
_, err = sh.Put(ctx, putPrm)
if err != nil {
if errors.Is(err, shard.ErrReadOnlyMode) || errors.Is(err, blobstor.ErrNoPlaceFound) ||
errors.Is(err, common.ErrReadOnly) || errors.Is(err, common.ErrNoSpace) {
e.log.Warn(ctx, logs.EngineCouldNotPutObjectToShard,
zap.Stringer("shard_id", sh.ID()),
zap.Error(err))
return
}
if client.IsErrObjectAlreadyRemoved(err) {
e.log.Warn(ctx, logs.EngineCouldNotPutObjectToShard,
zap.Stringer("shard_id", sh.ID()),
zap.Error(err))
res.status = putToShardRemoved
res.err = err
return
}
e.reportShardError(ctx, sh, "could not put object to shard", err, zap.Stringer("address", addr))
return
}
res.status = putToShardSuccess
}); err != nil {
e.log.Warn(ctx, logs.EngineCouldNotPutObjectToShard, zap.Error(err))
close(exitCh)
return // this is not ErrAlreadyRemoved error so we can go to the next shard
}
<-exitCh
if exists.Exists() {
res.status = putToShardExists
return
}
var putPrm shard.PutPrm
putPrm.SetObject(obj)
putPrm.SetIndexAttributes(isIndexedContainer)
_, err = sh.Put(ctx, putPrm)
if err != nil {
if errors.Is(err, shard.ErrReadOnlyMode) || errors.Is(err, blobstor.ErrNoPlaceFound) ||
errors.Is(err, common.ErrReadOnly) || errors.Is(err, common.ErrNoSpace) {
e.log.Warn(ctx, logs.EngineCouldNotPutObjectToShard,
zap.Stringer("shard_id", sh.ID()),
zap.Error(err))
return
}
if client.IsErrObjectAlreadyRemoved(err) {
e.log.Warn(ctx, logs.EngineCouldNotPutObjectToShard,
zap.Stringer("shard_id", sh.ID()),
zap.Error(err))
res.status = putToShardRemoved
res.err = err
return
}
e.reportShardError(ctx, sh, "could not put object to shard", err, zap.Stringer("address", addr))
return
}
res.status = putToShardSuccess
return
}

View file

@ -17,7 +17,6 @@ import (
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"git.frostfs.info/TrueCloudLab/hrw"
"github.com/google/uuid"
"github.com/panjf2000/ants/v2"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
)
@ -181,11 +180,6 @@ func (e *StorageEngine) addShard(sh *shard.Shard) error {
e.mtx.Lock()
defer e.mtx.Unlock()
pool, err := ants.NewPool(int(e.shardPoolSize), ants.WithNonblocking(true))
if err != nil {
return fmt.Errorf("create pool: %w", err)
}
strID := sh.ID().String()
if _, ok := e.shards[strID]; ok {
return fmt.Errorf("shard with id %s was already added", strID)
@ -199,8 +193,6 @@ func (e *StorageEngine) addShard(sh *shard.Shard) error {
hash: hrw.StringHash(strID),
}
e.shardPools[strID] = pool
return nil
}
@ -225,12 +217,6 @@ func (e *StorageEngine) removeShards(ctx context.Context, ids ...string) {
ss = append(ss, sh)
delete(e.shards, id)
pool, ok := e.shardPools[id]
if ok {
pool.Release()
delete(e.shardPools, id)
}
e.log.Info(ctx, logs.EngineShardHasBeenRemoved,
zap.String("id", id))
}
@ -429,12 +415,6 @@ func (e *StorageEngine) deleteShards(ctx context.Context, ids []*shard.ID) ([]ha
delete(e.shards, idStr)
pool, ok := e.shardPools[idStr]
if ok {
pool.Release()
delete(e.shardPools, idStr)
}
e.log.Info(ctx, logs.EngineShardHasBeenRemoved,
zap.String("id", idStr))
}

View file

@ -17,7 +17,6 @@ func TestRemoveShard(t *testing.T) {
e, ids := te.engine, te.shardIDs
defer func() { require.NoError(t, e.Close(context.Background())) }()
require.Equal(t, numOfShards, len(e.shardPools))
require.Equal(t, numOfShards, len(e.shards))
removedNum := numOfShards / 2
@ -37,7 +36,6 @@ func TestRemoveShard(t *testing.T) {
}
}
require.Equal(t, numOfShards-removedNum, len(e.shardPools))
require.Equal(t, numOfShards-removedNum, len(e.shards))
for id, removed := range mSh {