From 5b1975d52a947e9f4ec34e1a89f18dfcee60f81d Mon Sep 17 00:00:00 2001 From: Leonard Lyubich Date: Fri, 8 Oct 2021 15:25:45 +0300 Subject: [PATCH] [#674] storage engine: Use per-shard worker pools for PUT operation Make `StorageEngine` to use non-blocking worker pools with the same (configurable) size for PUT operation. This allows you to switch to using more free shards when overloading others, thereby more evenly distributing the write load. Signed-off-by: Leonard Lyubich --- pkg/local_object_storage/engine/engine.go | 21 ++++++- pkg/local_object_storage/engine/put.go | 76 +++++++++++++---------- pkg/local_object_storage/engine/shards.go | 12 +++- 3 files changed, 73 insertions(+), 36 deletions(-) diff --git a/pkg/local_object_storage/engine/engine.go b/pkg/local_object_storage/engine/engine.go index 9843e3248..a78b284e7 100644 --- a/pkg/local_object_storage/engine/engine.go +++ b/pkg/local_object_storage/engine/engine.go @@ -4,6 +4,7 @@ import ( "sync" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard" + "github.com/nspcc-dev/neofs-node/pkg/util" "github.com/nspcc-dev/neofs-node/pkg/util/logger" "go.uber.org/zap" ) @@ -15,6 +16,8 @@ type StorageEngine struct { mtx *sync.RWMutex shards map[string]*shard.Shard + + shardPools map[string]util.WorkerPool } // Option represents StorageEngine's constructor option. @@ -24,11 +27,15 @@ type cfg struct { log *logger.Logger metrics MetricRegister + + shardPoolSize uint32 } func defaultCfg() *cfg { return &cfg{ log: zap.L(), + + shardPoolSize: 20, } } @@ -41,9 +48,10 @@ func New(opts ...Option) *StorageEngine { } return &StorageEngine{ - cfg: c, - mtx: new(sync.RWMutex), - shards: make(map[string]*shard.Shard), + cfg: c, + mtx: new(sync.RWMutex), + shards: make(map[string]*shard.Shard), + shardPools: make(map[string]util.WorkerPool), } } @@ -59,3 +67,10 @@ func WithMetrics(v MetricRegister) Option { c.metrics = v } } + +// WithShardPoolSize returns option to specify size of worker pool for each shard. +func WithShardPoolSize(sz uint32) Option { + return func(c *cfg) { + c.shardPoolSize = sz + } +} diff --git a/pkg/local_object_storage/engine/put.go b/pkg/local_object_storage/engine/put.go index 56f62612d..c8d8e55a6 100644 --- a/pkg/local_object_storage/engine/put.go +++ b/pkg/local_object_storage/engine/put.go @@ -49,46 +49,58 @@ func (e *StorageEngine) Put(prm *PutPrm) (*PutRes, error) { finished := false e.iterateOverSortedShards(prm.obj.Address(), func(ind int, s *shard.Shard) (stop bool) { - exists, err := s.Exists(existPrm) - if err != nil { - return false // this is not ErrAlreadyRemoved error so we can go to the next shard - } + e.mtx.RLock() + pool := e.shardPools[s.ID().String()] + e.mtx.RUnlock() - if exists.Exists() { - if ind != 0 { - toMoveItPrm := new(shard.ToMoveItPrm) - toMoveItPrm.WithAddress(prm.obj.Address()) + exitCh := make(chan struct{}) - _, err = s.ToMoveIt(toMoveItPrm) - if err != nil { - e.log.Warn("could not mark object for shard relocation", - zap.Stringer("shard", s.ID()), - zap.String("error", err.Error()), - ) + _ = pool.Submit(func() { + defer close(exitCh) + + exists, err := s.Exists(existPrm) + if err != nil { + return // this is not ErrAlreadyRemoved error so we can go to the next shard + } + + if exists.Exists() { + if ind != 0 { + toMoveItPrm := new(shard.ToMoveItPrm) + toMoveItPrm.WithAddress(prm.obj.Address()) + + _, err = s.ToMoveIt(toMoveItPrm) + if err != nil { + e.log.Warn("could not mark object for shard relocation", + zap.Stringer("shard", s.ID()), + zap.String("error", err.Error()), + ) + } } + + finished = true + + return + } + + putPrm := new(shard.PutPrm) + putPrm.WithObject(prm.obj) + + _, err = s.Put(putPrm) + if err != nil { + e.log.Warn("could not put object in shard", + zap.Stringer("shard", s.ID()), + zap.String("error", err.Error()), + ) + + return } finished = true + }) - return true - } + <-exitCh - putPrm := new(shard.PutPrm) - putPrm.WithObject(prm.obj) - - _, err = s.Put(putPrm) - if err != nil { - e.log.Warn("could not put object in shard", - zap.Stringer("shard", s.ID()), - zap.String("error", err.Error()), - ) - - return false - } - - finished = true - - return true + return finished }) if !finished { diff --git a/pkg/local_object_storage/engine/shards.go b/pkg/local_object_storage/engine/shards.go index 41633b260..be8219c37 100644 --- a/pkg/local_object_storage/engine/shards.go +++ b/pkg/local_object_storage/engine/shards.go @@ -8,6 +8,7 @@ import ( "github.com/nspcc-dev/hrw" "github.com/nspcc-dev/neofs-api-go/pkg/object" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard" + "github.com/panjf2000/ants/v2" ) var errShardNotFound = errors.New("shard not found") @@ -29,11 +30,20 @@ func (e *StorageEngine) AddShard(opts ...shard.Option) (*shard.ID, error) { return nil, fmt.Errorf("could not generate shard ID: %w", err) } - e.shards[id.String()] = shard.New(append(opts, + pool, err := ants.NewPool(int(e.shardPoolSize), ants.WithNonblocking(true)) + if err != nil { + return nil, err + } + + strID := id.String() + + e.shards[strID] = shard.New(append(opts, shard.WithID(id), shard.WithExpiredObjectsCallback(e.processExpiredTombstones), )...) + e.shardPools[strID] = pool + return id, nil }