[#674] storage engine: Use per-shard worker pools for PUT operation

Make `StorageEngine` to use non-blocking worker pools with the same
(configurable) size for PUT operation. This allows you to switch to using
more free shards when overloading others, thereby more evenly distributing
the write load.

Signed-off-by: Leonard Lyubich <leonard@nspcc.ru>
This commit is contained in:
Leonard Lyubich 2021-10-08 15:25:45 +03:00 committed by Alex Vanin
parent b08c6b3f30
commit 5b1975d52a
3 changed files with 73 additions and 36 deletions

View file

@ -4,6 +4,7 @@ import (
"sync"
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard"
"github.com/nspcc-dev/neofs-node/pkg/util"
"github.com/nspcc-dev/neofs-node/pkg/util/logger"
"go.uber.org/zap"
)
@ -15,6 +16,8 @@ type StorageEngine struct {
mtx *sync.RWMutex
shards map[string]*shard.Shard
shardPools map[string]util.WorkerPool
}
// Option represents StorageEngine's constructor option.
@ -24,11 +27,15 @@ type cfg struct {
log *logger.Logger
metrics MetricRegister
shardPoolSize uint32
}
func defaultCfg() *cfg {
return &cfg{
log: zap.L(),
shardPoolSize: 20,
}
}
@ -41,9 +48,10 @@ func New(opts ...Option) *StorageEngine {
}
return &StorageEngine{
cfg: c,
mtx: new(sync.RWMutex),
shards: make(map[string]*shard.Shard),
cfg: c,
mtx: new(sync.RWMutex),
shards: make(map[string]*shard.Shard),
shardPools: make(map[string]util.WorkerPool),
}
}
@ -59,3 +67,10 @@ func WithMetrics(v MetricRegister) Option {
c.metrics = v
}
}
// WithShardPoolSize returns option to specify size of worker pool for each shard.
func WithShardPoolSize(sz uint32) Option {
return func(c *cfg) {
c.shardPoolSize = sz
}
}

View file

@ -49,46 +49,58 @@ func (e *StorageEngine) Put(prm *PutPrm) (*PutRes, error) {
finished := false
e.iterateOverSortedShards(prm.obj.Address(), func(ind int, s *shard.Shard) (stop bool) {
exists, err := s.Exists(existPrm)
if err != nil {
return false // this is not ErrAlreadyRemoved error so we can go to the next shard
}
e.mtx.RLock()
pool := e.shardPools[s.ID().String()]
e.mtx.RUnlock()
if exists.Exists() {
if ind != 0 {
toMoveItPrm := new(shard.ToMoveItPrm)
toMoveItPrm.WithAddress(prm.obj.Address())
exitCh := make(chan struct{})
_, err = s.ToMoveIt(toMoveItPrm)
if err != nil {
e.log.Warn("could not mark object for shard relocation",
zap.Stringer("shard", s.ID()),
zap.String("error", err.Error()),
)
_ = pool.Submit(func() {
defer close(exitCh)
exists, err := s.Exists(existPrm)
if err != nil {
return // this is not ErrAlreadyRemoved error so we can go to the next shard
}
if exists.Exists() {
if ind != 0 {
toMoveItPrm := new(shard.ToMoveItPrm)
toMoveItPrm.WithAddress(prm.obj.Address())
_, err = s.ToMoveIt(toMoveItPrm)
if err != nil {
e.log.Warn("could not mark object for shard relocation",
zap.Stringer("shard", s.ID()),
zap.String("error", err.Error()),
)
}
}
finished = true
return
}
putPrm := new(shard.PutPrm)
putPrm.WithObject(prm.obj)
_, err = s.Put(putPrm)
if err != nil {
e.log.Warn("could not put object in shard",
zap.Stringer("shard", s.ID()),
zap.String("error", err.Error()),
)
return
}
finished = true
})
return true
}
<-exitCh
putPrm := new(shard.PutPrm)
putPrm.WithObject(prm.obj)
_, err = s.Put(putPrm)
if err != nil {
e.log.Warn("could not put object in shard",
zap.Stringer("shard", s.ID()),
zap.String("error", err.Error()),
)
return false
}
finished = true
return true
return finished
})
if !finished {

View file

@ -8,6 +8,7 @@ import (
"github.com/nspcc-dev/hrw"
"github.com/nspcc-dev/neofs-api-go/pkg/object"
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard"
"github.com/panjf2000/ants/v2"
)
var errShardNotFound = errors.New("shard not found")
@ -29,11 +30,20 @@ func (e *StorageEngine) AddShard(opts ...shard.Option) (*shard.ID, error) {
return nil, fmt.Errorf("could not generate shard ID: %w", err)
}
e.shards[id.String()] = shard.New(append(opts,
pool, err := ants.NewPool(int(e.shardPoolSize), ants.WithNonblocking(true))
if err != nil {
return nil, err
}
strID := id.String()
e.shards[strID] = shard.New(append(opts,
shard.WithID(id),
shard.WithExpiredObjectsCallback(e.processExpiredTombstones),
)...)
e.shardPools[strID] = pool
return id, nil
}