From f52f643fe45b3eca787d1d0585934023a2e27e4d Mon Sep 17 00:00:00 2001 From: Leonard Lyubich Date: Wed, 21 Oct 2020 12:28:42 +0300 Subject: [PATCH] [#108] cmd/neofs-node: Add Policer worker to application Signed-off-by: Leonard Lyubich --- cmd/neofs-node/config.go | 8 ++++++++ cmd/neofs-node/object.go | 40 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 48 insertions(+) diff --git a/cmd/neofs-node/config.go b/cmd/neofs-node/config.go index cde64c2b..8d7e2ced 100644 --- a/cmd/neofs-node/config.go +++ b/cmd/neofs-node/config.go @@ -80,6 +80,10 @@ const ( cfgGCQueueSize = "gc.queuesize" cfgGCQueueTick = "gc.duration.sleep" cfgGCTimeout = "gc.duration.timeout" + + cfgPolicerWorkScope = "policer.work_scope" + cfgPolicerExpRate = "policer.expansion_rate" + cfgPolicerHeadTimeout = "policer.head_timeout" ) const ( @@ -316,6 +320,10 @@ func defaultConfiguration(v *viper.Viper) { v.SetDefault(cfgGCQueueSize, 1000) v.SetDefault(cfgGCQueueTick, "5s") v.SetDefault(cfgGCTimeout, "5s") + + v.SetDefault(cfgPolicerWorkScope, 100) + v.SetDefault(cfgPolicerExpRate, 10) // in % + v.SetDefault(cfgPolicerHeadTimeout, 5*time.Second) } func (c *cfg) LocalAddress() *network.Address { diff --git a/cmd/neofs-node/object.go b/cmd/neofs-node/object.go index fb035d15..88dbdc37 100644 --- a/cmd/neofs-node/object.go +++ b/cmd/neofs-node/object.go @@ -12,6 +12,7 @@ import ( objectCore "github.com/nspcc-dev/neofs-node/pkg/core/object" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/bucket" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/localstore" + "github.com/nspcc-dev/neofs-node/pkg/morph/event" objectTransportGRPC "github.com/nspcc-dev/neofs-node/pkg/network/transport/object/grpc" objectService "github.com/nspcc-dev/neofs-node/pkg/services/object" "github.com/nspcc-dev/neofs-node/pkg/services/object/acl" @@ -32,6 +33,8 @@ import ( searchsvcV2 "github.com/nspcc-dev/neofs-node/pkg/services/object/search/v2" "github.com/nspcc-dev/neofs-node/pkg/services/object/util" "github.com/nspcc-dev/neofs-node/pkg/services/object_manager/gc" + "github.com/nspcc-dev/neofs-node/pkg/services/object_manager/placement" + "github.com/nspcc-dev/neofs-node/pkg/services/policer" "github.com/panjf2000/ants/v2" ) @@ -183,6 +186,43 @@ func initObjectService(c *cfg) { c.workers = append(c.workers, objGC) + ch := make(chan *policer.Task, 1) + + pol := policer.New( + policer.WithLogger(c.log), + policer.WithLocalStorage(ls), + policer.WithContainerSource(c.cfgObject.cnrStorage), + policer.WithPlacementBuilder( + placement.NewNetworkMapSourceBuilder(c.cfgObject.netMapStorage), + ), + policer.WithWorkScope( + c.viper.GetInt(cfgPolicerWorkScope), + ), + policer.WithExpansionRate( + c.viper.GetInt(cfgPolicerExpRate), + ), + policer.WithTrigger(ch), + policer.WithRemoteHeader( + headsvc.NewRemoteHeader(keyStorage), + ), + policer.WithLocalAddressSource(c), + policer.WithHeadTimeout( + c.viper.GetDuration(cfgPolicerHeadTimeout), + ), + ) + + addNewEpochNotificationHandler(c, func(ev event.Event) { + select { + case ch <- new(policer.Task): + case <-c.ctx.Done(): + close(ch) + default: + c.log.Info("policer is busy") + } + }) + + c.workers = append(c.workers, pol) + sPut := putsvc.NewService( putsvc.WithKeyStorage(keyStorage), putsvc.WithMaxSizeSource(&maxSzSrc{c.cfgObject.maxObjectSize}),