diff --git a/pkg/local_object_storage/engine/engine.go b/pkg/local_object_storage/engine/engine.go index 9843e324..a78b284e 100644 --- a/pkg/local_object_storage/engine/engine.go +++ b/pkg/local_object_storage/engine/engine.go @@ -4,6 +4,7 @@ import ( "sync" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard" + "github.com/nspcc-dev/neofs-node/pkg/util" "github.com/nspcc-dev/neofs-node/pkg/util/logger" "go.uber.org/zap" ) @@ -15,6 +16,8 @@ type StorageEngine struct { mtx *sync.RWMutex shards map[string]*shard.Shard + + shardPools map[string]util.WorkerPool } // Option represents StorageEngine's constructor option. @@ -24,11 +27,15 @@ type cfg struct { log *logger.Logger metrics MetricRegister + + shardPoolSize uint32 } func defaultCfg() *cfg { return &cfg{ log: zap.L(), + + shardPoolSize: 20, } } @@ -41,9 +48,10 @@ func New(opts ...Option) *StorageEngine { } return &StorageEngine{ - cfg: c, - mtx: new(sync.RWMutex), - shards: make(map[string]*shard.Shard), + cfg: c, + mtx: new(sync.RWMutex), + shards: make(map[string]*shard.Shard), + shardPools: make(map[string]util.WorkerPool), } } @@ -59,3 +67,10 @@ func WithMetrics(v MetricRegister) Option { c.metrics = v } } + +// WithShardPoolSize returns option to specify size of worker pool for each shard. +func WithShardPoolSize(sz uint32) Option { + return func(c *cfg) { + c.shardPoolSize = sz + } +} diff --git a/pkg/local_object_storage/engine/put.go b/pkg/local_object_storage/engine/put.go index 56f62612..c8d8e55a 100644 --- a/pkg/local_object_storage/engine/put.go +++ b/pkg/local_object_storage/engine/put.go @@ -49,46 +49,58 @@ func (e *StorageEngine) Put(prm *PutPrm) (*PutRes, error) { finished := false e.iterateOverSortedShards(prm.obj.Address(), func(ind int, s *shard.Shard) (stop bool) { - exists, err := s.Exists(existPrm) - if err != nil { - return false // this is not ErrAlreadyRemoved error so we can go to the next shard - } + e.mtx.RLock() + pool := e.shardPools[s.ID().String()] + e.mtx.RUnlock() - if exists.Exists() { - if ind != 0 { - toMoveItPrm := new(shard.ToMoveItPrm) - toMoveItPrm.WithAddress(prm.obj.Address()) + exitCh := make(chan struct{}) - _, err = s.ToMoveIt(toMoveItPrm) - if err != nil { - e.log.Warn("could not mark object for shard relocation", - zap.Stringer("shard", s.ID()), - zap.String("error", err.Error()), - ) + _ = pool.Submit(func() { + defer close(exitCh) + + exists, err := s.Exists(existPrm) + if err != nil { + return // this is not ErrAlreadyRemoved error so we can go to the next shard + } + + if exists.Exists() { + if ind != 0 { + toMoveItPrm := new(shard.ToMoveItPrm) + toMoveItPrm.WithAddress(prm.obj.Address()) + + _, err = s.ToMoveIt(toMoveItPrm) + if err != nil { + e.log.Warn("could not mark object for shard relocation", + zap.Stringer("shard", s.ID()), + zap.String("error", err.Error()), + ) + } } + + finished = true + + return + } + + putPrm := new(shard.PutPrm) + putPrm.WithObject(prm.obj) + + _, err = s.Put(putPrm) + if err != nil { + e.log.Warn("could not put object in shard", + zap.Stringer("shard", s.ID()), + zap.String("error", err.Error()), + ) + + return } finished = true + }) - return true - } + <-exitCh - putPrm := new(shard.PutPrm) - putPrm.WithObject(prm.obj) - - _, err = s.Put(putPrm) - if err != nil { - e.log.Warn("could not put object in shard", - zap.Stringer("shard", s.ID()), - zap.String("error", err.Error()), - ) - - return false - } - - finished = true - - return true + return finished }) if !finished { diff --git a/pkg/local_object_storage/engine/shards.go b/pkg/local_object_storage/engine/shards.go index 41633b26..be8219c3 100644 --- a/pkg/local_object_storage/engine/shards.go +++ b/pkg/local_object_storage/engine/shards.go @@ -8,6 +8,7 @@ import ( "github.com/nspcc-dev/hrw" "github.com/nspcc-dev/neofs-api-go/pkg/object" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard" + "github.com/panjf2000/ants/v2" ) var errShardNotFound = errors.New("shard not found") @@ -29,11 +30,20 @@ func (e *StorageEngine) AddShard(opts ...shard.Option) (*shard.ID, error) { return nil, fmt.Errorf("could not generate shard ID: %w", err) } - e.shards[id.String()] = shard.New(append(opts, + pool, err := ants.NewPool(int(e.shardPoolSize), ants.WithNonblocking(true)) + if err != nil { + return nil, err + } + + strID := id.String() + + e.shards[strID] = shard.New(append(opts, shard.WithID(id), shard.WithExpiredObjectsCallback(e.processExpiredTombstones), )...) + e.shardPools[strID] = pool + return id, nil }