forked from TrueCloudLab/frostfs-node
[#1868] Reload config for pprof and metrics on SIGHUP
Signed-off-by: Anton Nikiforov <an.nikiforov@yadro.com>
This commit is contained in:
parent
2b755ddb12
commit
22f3c7d080
12 changed files with 345 additions and 172 deletions
|
@ -10,6 +10,7 @@ Changelog for FrostFS Node
|
||||||
- New `frostfs_node_object_payload_size` metric for tracking size of reqular objects on a single shard (#1794)
|
- New `frostfs_node_object_payload_size` metric for tracking size of reqular objects on a single shard (#1794)
|
||||||
- Add command `frostfs-adm morph netmap-candidates` (#1889)
|
- Add command `frostfs-adm morph netmap-candidates` (#1889)
|
||||||
- `object.delete.tombstone_lifetime` config parameter to set tombstone lifetime in the DELETE service (#2246)
|
- `object.delete.tombstone_lifetime` config parameter to set tombstone lifetime in the DELETE service (#2246)
|
||||||
|
- Reload config for pprof and metrics on SIGHUP in `neofs-node` (#1868)
|
||||||
|
|
||||||
### Changed
|
### Changed
|
||||||
- Change `frostfs_node_engine_container_size` to counting sizes of logical objects
|
- Change `frostfs_node_engine_container_size` to counting sizes of logical objects
|
||||||
|
|
|
@ -126,7 +126,7 @@ func initHTTPServers(cfg *viper.Viper, log *logger.Logger) []*httputil.Server {
|
||||||
|
|
||||||
addr := cfg.GetString(item.cfgPrefix + ".address")
|
addr := cfg.GetString(item.cfgPrefix + ".address")
|
||||||
|
|
||||||
var prm httputil.Prm
|
var prm httputil.HTTPSrvPrm
|
||||||
|
|
||||||
prm.Address = addr
|
prm.Address = addr
|
||||||
prm.Handler = item.handler()
|
prm.Handler = item.handler()
|
||||||
|
|
25
cmd/frostfs-node/closer.go
Normal file
25
cmd/frostfs-node/closer.go
Normal file
|
@ -0,0 +1,25 @@
|
||||||
|
package main
|
||||||
|
|
||||||
|
type closer struct {
|
||||||
|
name string
|
||||||
|
fn func()
|
||||||
|
}
|
||||||
|
|
||||||
|
func getCloser(c *cfg, name string) *closer {
|
||||||
|
for _, clsr := range c.closers {
|
||||||
|
if clsr.name == name {
|
||||||
|
return &clsr
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func delCloser(c *cfg, name string) {
|
||||||
|
for i, clsr := range c.closers {
|
||||||
|
if clsr.name == name {
|
||||||
|
c.closers[i] = c.closers[len(c.closers)-1]
|
||||||
|
c.closers = c.closers[:len(c.closers)-1]
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -24,7 +24,6 @@ import (
|
||||||
blobovniczaconfig "github.com/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine/shard/blobstor/blobovnicza"
|
blobovniczaconfig "github.com/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine/shard/blobstor/blobovnicza"
|
||||||
fstreeconfig "github.com/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine/shard/blobstor/fstree"
|
fstreeconfig "github.com/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine/shard/blobstor/fstree"
|
||||||
loggerconfig "github.com/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/logger"
|
loggerconfig "github.com/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/logger"
|
||||||
metricsconfig "github.com/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/metrics"
|
|
||||||
nodeconfig "github.com/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/node"
|
nodeconfig "github.com/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/node"
|
||||||
objectconfig "github.com/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/object"
|
objectconfig "github.com/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/object"
|
||||||
replicatorconfig "github.com/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/replicator"
|
replicatorconfig "github.com/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/replicator"
|
||||||
|
@ -48,6 +47,7 @@ import (
|
||||||
"github.com/TrueCloudLab/frostfs-node/pkg/network"
|
"github.com/TrueCloudLab/frostfs-node/pkg/network"
|
||||||
"github.com/TrueCloudLab/frostfs-node/pkg/network/cache"
|
"github.com/TrueCloudLab/frostfs-node/pkg/network/cache"
|
||||||
"github.com/TrueCloudLab/frostfs-node/pkg/services/control"
|
"github.com/TrueCloudLab/frostfs-node/pkg/services/control"
|
||||||
|
objectService "github.com/TrueCloudLab/frostfs-node/pkg/services/object"
|
||||||
getsvc "github.com/TrueCloudLab/frostfs-node/pkg/services/object/get"
|
getsvc "github.com/TrueCloudLab/frostfs-node/pkg/services/object/get"
|
||||||
"github.com/TrueCloudLab/frostfs-node/pkg/services/object_manager/tombstone"
|
"github.com/TrueCloudLab/frostfs-node/pkg/services/object_manager/tombstone"
|
||||||
tsourse "github.com/TrueCloudLab/frostfs-node/pkg/services/object_manager/tombstone/source"
|
tsourse "github.com/TrueCloudLab/frostfs-node/pkg/services/object_manager/tombstone/source"
|
||||||
|
@ -308,7 +308,7 @@ type internals struct {
|
||||||
|
|
||||||
wg *sync.WaitGroup
|
wg *sync.WaitGroup
|
||||||
workers []worker
|
workers []worker
|
||||||
closers []func()
|
closers []closer
|
||||||
|
|
||||||
apiVersion version.Version
|
apiVersion version.Version
|
||||||
healthStatus *atomic.Int32
|
healthStatus *atomic.Int32
|
||||||
|
@ -364,12 +364,16 @@ type shared struct {
|
||||||
treeService *tree.Service
|
treeService *tree.Service
|
||||||
|
|
||||||
metricsCollector *metrics.NodeMetrics
|
metricsCollector *metrics.NodeMetrics
|
||||||
|
|
||||||
|
metricsSvc *objectService.MetricCollector
|
||||||
}
|
}
|
||||||
|
|
||||||
// dynamicConfiguration stores parameters of the
|
// dynamicConfiguration stores parameters of the
|
||||||
// components that supports runtime reconfigurations.
|
// components that supports runtime reconfigurations.
|
||||||
type dynamicConfiguration struct {
|
type dynamicConfiguration struct {
|
||||||
logger *logger.Prm
|
logger *logger.Prm
|
||||||
|
pprof *httpComponent
|
||||||
|
metrics *httpComponent
|
||||||
}
|
}
|
||||||
|
|
||||||
type cfg struct {
|
type cfg struct {
|
||||||
|
@ -612,10 +616,8 @@ func initCfg(appCfg *config.Config) *cfg {
|
||||||
|
|
||||||
user.IDFromKey(&c.ownerIDFromKey, key.PrivateKey.PublicKey)
|
user.IDFromKey(&c.ownerIDFromKey, key.PrivateKey.PublicKey)
|
||||||
|
|
||||||
if metricsconfig.Enabled(c.appCfg) {
|
c.metricsCollector = metrics.NewNodeMetrics()
|
||||||
c.metricsCollector = metrics.NewNodeMetrics()
|
netState.metrics = c.metricsCollector
|
||||||
netState.metrics = c.metricsCollector
|
|
||||||
}
|
|
||||||
|
|
||||||
c.onShutdown(c.clientCache.CloseAll) // clean up connections
|
c.onShutdown(c.clientCache.CloseAll) // clean up connections
|
||||||
c.onShutdown(c.bgClientCache.CloseAll) // clean up connections
|
c.onShutdown(c.bgClientCache.CloseAll) // clean up connections
|
||||||
|
@ -915,11 +917,9 @@ func (c *cfg) ObjectServiceLoad() float64 {
|
||||||
return float64(c.cfgObject.pool.putRemote.Running()) / float64(c.cfgObject.pool.putRemoteCapacity)
|
return float64(c.cfgObject.pool.putRemote.Running()) / float64(c.cfgObject.pool.putRemoteCapacity)
|
||||||
}
|
}
|
||||||
|
|
||||||
type dCfg struct {
|
type dCmp struct {
|
||||||
name string
|
name string
|
||||||
cfg interface {
|
reloadFunc func() error
|
||||||
Reload() error
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *cfg) signalWatcher() {
|
func (c *cfg) signalWatcher() {
|
||||||
|
@ -964,7 +964,7 @@ func (c *cfg) reloadConfig() {
|
||||||
|
|
||||||
// all the components are expected to support
|
// all the components are expected to support
|
||||||
// Logger's dynamic reconfiguration approach
|
// Logger's dynamic reconfiguration approach
|
||||||
var components []dCfg
|
var components []dCmp
|
||||||
|
|
||||||
// Logger
|
// Logger
|
||||||
|
|
||||||
|
@ -974,7 +974,18 @@ func (c *cfg) reloadConfig() {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
components = append(components, dCfg{name: "logger", cfg: logPrm})
|
components = append(components, dCmp{"logger", logPrm.Reload})
|
||||||
|
if cmp, updated := metricsComponent(c); updated {
|
||||||
|
if cmp.enabled {
|
||||||
|
cmp.preReload = enableMetricsSvc
|
||||||
|
} else {
|
||||||
|
cmp.preReload = disableMetricsSvc
|
||||||
|
}
|
||||||
|
components = append(components, dCmp{cmp.name, cmp.reload})
|
||||||
|
}
|
||||||
|
if cmp, updated := pprofComponent(c); updated {
|
||||||
|
components = append(components, dCmp{cmp.name, cmp.reload})
|
||||||
|
}
|
||||||
|
|
||||||
// Storage Engine
|
// Storage Engine
|
||||||
|
|
||||||
|
@ -990,7 +1001,7 @@ func (c *cfg) reloadConfig() {
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, component := range components {
|
for _, component := range components {
|
||||||
err = component.cfg.Reload()
|
err = component.reloadFunc()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
c.log.Error("updated configuration applying",
|
c.log.Error("updated configuration applying",
|
||||||
zap.String("component", component.name),
|
zap.String("component", component.name),
|
||||||
|
@ -1006,7 +1017,7 @@ func (c *cfg) shutdown() {
|
||||||
|
|
||||||
c.ctxCancel()
|
c.ctxCancel()
|
||||||
for i := range c.closers {
|
for i := range c.closers {
|
||||||
c.closers[len(c.closers)-1-i]()
|
c.closers[len(c.closers)-1-i].fn()
|
||||||
}
|
}
|
||||||
close(c.internalErr)
|
close(c.internalErr)
|
||||||
}
|
}
|
||||||
|
|
70
cmd/frostfs-node/httpcomponent.go
Normal file
70
cmd/frostfs-node/httpcomponent.go
Normal file
|
@ -0,0 +1,70 @@
|
||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"net/http"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
httputil "github.com/TrueCloudLab/frostfs-node/pkg/util/http"
|
||||||
|
)
|
||||||
|
|
||||||
|
type httpComponent struct {
|
||||||
|
address string
|
||||||
|
name string
|
||||||
|
handler http.Handler
|
||||||
|
shutdownDur time.Duration
|
||||||
|
enabled bool
|
||||||
|
cfg *cfg
|
||||||
|
preReload func(c *cfg)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (cmp *httpComponent) init(c *cfg) {
|
||||||
|
if !cmp.enabled {
|
||||||
|
c.log.Info(fmt.Sprintf("%s is disabled", cmp.name))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
// Init server with parameters
|
||||||
|
srv := httputil.New(
|
||||||
|
*httputil.NewHTTPSrvPrm(
|
||||||
|
cmp.address,
|
||||||
|
cmp.handler,
|
||||||
|
),
|
||||||
|
httputil.WithShutdownTimeout(
|
||||||
|
cmp.shutdownDur,
|
||||||
|
),
|
||||||
|
)
|
||||||
|
c.closers = append(c.closers, closer{
|
||||||
|
cmp.name,
|
||||||
|
func() { stopAndLog(c, cmp.name, srv.Shutdown) },
|
||||||
|
})
|
||||||
|
c.workers = append(c.workers, worker{
|
||||||
|
cmp.name,
|
||||||
|
func(ctx context.Context) {
|
||||||
|
runAndLog(c, cmp.name, false, func(c *cfg) {
|
||||||
|
fatalOnErr(srv.Serve())
|
||||||
|
})
|
||||||
|
},
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func (cmp *httpComponent) reload() error {
|
||||||
|
if cmp.preReload != nil {
|
||||||
|
cmp.preReload(cmp.cfg)
|
||||||
|
}
|
||||||
|
// Shutdown server
|
||||||
|
closer := getCloser(cmp.cfg, cmp.name)
|
||||||
|
if closer != nil {
|
||||||
|
closer.fn()
|
||||||
|
}
|
||||||
|
// Cleanup
|
||||||
|
delCloser(cmp.cfg, cmp.name)
|
||||||
|
delWorker(cmp.cfg, cmp.name)
|
||||||
|
// Init server with new parameters
|
||||||
|
cmp.init(cmp.cfg)
|
||||||
|
// Start worker
|
||||||
|
if cmp.enabled {
|
||||||
|
startWorker(cmp.cfg, *getWorker(cmp.cfg, cmp.name))
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
|
@ -81,8 +81,10 @@ func initApp(c *cfg) {
|
||||||
c.wg.Done()
|
c.wg.Done()
|
||||||
}()
|
}()
|
||||||
|
|
||||||
initAndLog(c, "pprof", initProfiler)
|
pprof, _ := pprofComponent(c)
|
||||||
initAndLog(c, "prometheus", initMetrics)
|
metrics, _ := metricsComponent(c)
|
||||||
|
initAndLog(c, pprof.name, pprof.init)
|
||||||
|
initAndLog(c, metrics.name, metrics.init)
|
||||||
|
|
||||||
initLocalStorage(c)
|
initLocalStorage(c)
|
||||||
|
|
||||||
|
@ -114,6 +116,19 @@ func runAndLog(c *cfg, name string, logSuccess bool, starter func(*cfg)) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func stopAndLog(c *cfg, name string, stopper func() error) {
|
||||||
|
c.log.Debug(fmt.Sprintf("shutting down %s service", name))
|
||||||
|
|
||||||
|
err := stopper()
|
||||||
|
if err != nil {
|
||||||
|
c.log.Debug(fmt.Sprintf("could not shutdown %s server", name),
|
||||||
|
zap.String("error", err.Error()),
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
c.log.Debug(fmt.Sprintf("%s service has been stopped", name))
|
||||||
|
}
|
||||||
|
|
||||||
func bootUp(c *cfg) {
|
func bootUp(c *cfg) {
|
||||||
runAndLog(c, "NATS", true, connectNats)
|
runAndLog(c, "NATS", true, connectNats)
|
||||||
runAndLog(c, "gRPC", false, serveGRPC)
|
runAndLog(c, "gRPC", false, serveGRPC)
|
||||||
|
@ -135,5 +150,5 @@ func wait(c *cfg) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *cfg) onShutdown(f func()) {
|
func (c *cfg) onShutdown(f func()) {
|
||||||
c.closers = append(c.closers, f)
|
c.closers = append(c.closers, closer{"", f})
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,47 +1,45 @@
|
||||||
package main
|
package main
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
|
||||||
|
|
||||||
metricsconfig "github.com/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/metrics"
|
metricsconfig "github.com/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/metrics"
|
||||||
httputil "github.com/TrueCloudLab/frostfs-node/pkg/util/http"
|
|
||||||
"github.com/prometheus/client_golang/prometheus/promhttp"
|
"github.com/prometheus/client_golang/prometheus/promhttp"
|
||||||
"go.uber.org/zap"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
func initMetrics(c *cfg) {
|
func metricsComponent(c *cfg) (*httpComponent, bool) {
|
||||||
if !metricsconfig.Enabled(c.appCfg) {
|
var updated bool
|
||||||
c.log.Info("prometheus is disabled")
|
// check if it has been inited before
|
||||||
return
|
if c.dynamicConfiguration.metrics == nil {
|
||||||
|
c.dynamicConfiguration.metrics = new(httpComponent)
|
||||||
|
c.dynamicConfiguration.metrics.cfg = c
|
||||||
|
c.dynamicConfiguration.metrics.name = "metrics"
|
||||||
|
c.dynamicConfiguration.metrics.handler = promhttp.Handler()
|
||||||
|
updated = true
|
||||||
}
|
}
|
||||||
|
|
||||||
var prm httputil.Prm
|
// (re)init read configuration
|
||||||
|
enabled := metricsconfig.Enabled(c.appCfg)
|
||||||
|
if enabled != c.dynamicConfiguration.metrics.enabled {
|
||||||
|
c.dynamicConfiguration.metrics.enabled = enabled
|
||||||
|
updated = true
|
||||||
|
}
|
||||||
|
address := metricsconfig.Address(c.appCfg)
|
||||||
|
if address != c.dynamicConfiguration.metrics.address {
|
||||||
|
c.dynamicConfiguration.metrics.address = address
|
||||||
|
updated = true
|
||||||
|
}
|
||||||
|
dur := metricsconfig.ShutdownTimeout(c.appCfg)
|
||||||
|
if dur != c.dynamicConfiguration.metrics.shutdownDur {
|
||||||
|
c.dynamicConfiguration.metrics.shutdownDur = dur
|
||||||
|
updated = true
|
||||||
|
}
|
||||||
|
|
||||||
prm.Address = metricsconfig.Address(c.appCfg)
|
return c.dynamicConfiguration.metrics, updated
|
||||||
prm.Handler = promhttp.Handler()
|
}
|
||||||
|
|
||||||
srv := httputil.New(prm,
|
func enableMetricsSvc(c *cfg) {
|
||||||
httputil.WithShutdownTimeout(
|
c.shared.metricsSvc.Enable()
|
||||||
metricsconfig.ShutdownTimeout(c.appCfg),
|
}
|
||||||
),
|
|
||||||
)
|
func disableMetricsSvc(c *cfg) {
|
||||||
|
c.shared.metricsSvc.Disable()
|
||||||
c.workers = append(c.workers, newWorkerFromFunc(func(context.Context) {
|
|
||||||
runAndLog(c, "metrics", false, func(c *cfg) {
|
|
||||||
fatalOnErr(srv.Serve())
|
|
||||||
})
|
|
||||||
}))
|
|
||||||
|
|
||||||
c.closers = append(c.closers, func() {
|
|
||||||
c.log.Debug("shutting down metrics service")
|
|
||||||
|
|
||||||
err := srv.Shutdown()
|
|
||||||
if err != nil {
|
|
||||||
c.log.Debug("could not shutdown metrics server",
|
|
||||||
zap.String("error", err.Error()),
|
|
||||||
)
|
|
||||||
}
|
|
||||||
|
|
||||||
c.log.Debug("metrics service has been stopped")
|
|
||||||
})
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -8,6 +8,7 @@ import (
|
||||||
|
|
||||||
"github.com/TrueCloudLab/frostfs-api-go/v2/object"
|
"github.com/TrueCloudLab/frostfs-api-go/v2/object"
|
||||||
objectGRPC "github.com/TrueCloudLab/frostfs-api-go/v2/object/grpc"
|
objectGRPC "github.com/TrueCloudLab/frostfs-api-go/v2/object/grpc"
|
||||||
|
metricsconfig "github.com/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/metrics"
|
||||||
policerconfig "github.com/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/policer"
|
policerconfig "github.com/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/policer"
|
||||||
replicatorconfig "github.com/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/replicator"
|
replicatorconfig "github.com/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/replicator"
|
||||||
coreclient "github.com/TrueCloudLab/frostfs-node/pkg/core/client"
|
coreclient "github.com/TrueCloudLab/frostfs-node/pkg/core/client"
|
||||||
|
@ -246,7 +247,11 @@ func initObjectService(c *cfg) {
|
||||||
|
|
||||||
traverseGen := util.NewTraverserGenerator(c.netMapSource, c.cfgObject.cnrSource, c)
|
traverseGen := util.NewTraverserGenerator(c.netMapSource, c.cfgObject.cnrSource, c)
|
||||||
|
|
||||||
c.workers = append(c.workers, pol)
|
c.workers = append(c.workers, worker{
|
||||||
|
fn: func(ctx context.Context) {
|
||||||
|
pol.Run(ctx)
|
||||||
|
},
|
||||||
|
})
|
||||||
|
|
||||||
var os putsvc.ObjectStorage = engineWithoutNotifications{
|
var os putsvc.ObjectStorage = engineWithoutNotifications{
|
||||||
engine: ls,
|
engine: ls,
|
||||||
|
@ -380,12 +385,9 @@ func initObjectService(c *cfg) {
|
||||||
respSvc,
|
respSvc,
|
||||||
)
|
)
|
||||||
|
|
||||||
var firstSvc objectService.ServiceServer = signSvc
|
c.shared.metricsSvc = objectService.NewMetricCollector(
|
||||||
if c.metricsCollector != nil {
|
signSvc, c.metricsCollector, metricsconfig.Enabled(c.appCfg))
|
||||||
firstSvc = objectService.NewMetricCollector(signSvc, c.metricsCollector)
|
server := objectTransportGRPC.New(c.shared.metricsSvc)
|
||||||
}
|
|
||||||
|
|
||||||
server := objectTransportGRPC.New(firstSvc)
|
|
||||||
|
|
||||||
for _, srv := range c.cfgGRPC.servers {
|
for _, srv := range c.cfgGRPC.servers {
|
||||||
objectGRPC.RegisterObjectServiceServer(srv, server)
|
objectGRPC.RegisterObjectServiceServer(srv, server)
|
||||||
|
|
|
@ -1,46 +1,37 @@
|
||||||
package main
|
package main
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
|
||||||
|
|
||||||
profilerconfig "github.com/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/profiler"
|
profilerconfig "github.com/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/profiler"
|
||||||
httputil "github.com/TrueCloudLab/frostfs-node/pkg/util/http"
|
httputil "github.com/TrueCloudLab/frostfs-node/pkg/util/http"
|
||||||
"go.uber.org/zap"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
func initProfiler(c *cfg) {
|
func pprofComponent(c *cfg) (*httpComponent, bool) {
|
||||||
if !profilerconfig.Enabled(c.appCfg) {
|
var updated bool
|
||||||
c.log.Info("pprof is disabled")
|
// check if it has been inited before
|
||||||
return
|
if c.dynamicConfiguration.pprof == nil {
|
||||||
|
c.dynamicConfiguration.pprof = new(httpComponent)
|
||||||
|
c.dynamicConfiguration.pprof.cfg = c
|
||||||
|
c.dynamicConfiguration.pprof.name = "pprof"
|
||||||
|
c.dynamicConfiguration.pprof.handler = httputil.Handler()
|
||||||
|
updated = true
|
||||||
}
|
}
|
||||||
|
|
||||||
var prm httputil.Prm
|
// (re)init read configuration
|
||||||
|
enabled := profilerconfig.Enabled(c.appCfg)
|
||||||
|
if enabled != c.dynamicConfiguration.pprof.enabled {
|
||||||
|
c.dynamicConfiguration.pprof.enabled = enabled
|
||||||
|
updated = true
|
||||||
|
}
|
||||||
|
address := profilerconfig.Address(c.appCfg)
|
||||||
|
if address != c.dynamicConfiguration.pprof.address {
|
||||||
|
c.dynamicConfiguration.pprof.address = address
|
||||||
|
updated = true
|
||||||
|
}
|
||||||
|
dur := profilerconfig.ShutdownTimeout(c.appCfg)
|
||||||
|
if dur != c.dynamicConfiguration.pprof.shutdownDur {
|
||||||
|
c.dynamicConfiguration.pprof.shutdownDur = dur
|
||||||
|
updated = true
|
||||||
|
}
|
||||||
|
|
||||||
prm.Address = profilerconfig.Address(c.appCfg)
|
return c.dynamicConfiguration.pprof, updated
|
||||||
prm.Handler = httputil.Handler()
|
|
||||||
|
|
||||||
srv := httputil.New(prm,
|
|
||||||
httputil.WithShutdownTimeout(
|
|
||||||
profilerconfig.ShutdownTimeout(c.appCfg),
|
|
||||||
),
|
|
||||||
)
|
|
||||||
|
|
||||||
c.workers = append(c.workers, newWorkerFromFunc(func(context.Context) {
|
|
||||||
runAndLog(c, "profiler", false, func(c *cfg) {
|
|
||||||
fatalOnErr(srv.Serve())
|
|
||||||
})
|
|
||||||
}))
|
|
||||||
|
|
||||||
c.closers = append(c.closers, func() {
|
|
||||||
c.log.Debug("shutting down profiling service")
|
|
||||||
|
|
||||||
err := srv.Shutdown()
|
|
||||||
if err != nil {
|
|
||||||
c.log.Debug("could not shutdown pprof server",
|
|
||||||
zap.String("error", err.Error()),
|
|
||||||
)
|
|
||||||
}
|
|
||||||
|
|
||||||
c.log.Debug("profiling service has been stopped")
|
|
||||||
})
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -4,31 +4,46 @@ import (
|
||||||
"context"
|
"context"
|
||||||
)
|
)
|
||||||
|
|
||||||
type worker interface {
|
type worker struct {
|
||||||
Run(context.Context)
|
name string
|
||||||
}
|
fn func(context.Context)
|
||||||
|
|
||||||
type workerFromFunc struct {
|
|
||||||
fn func(context.Context)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func newWorkerFromFunc(fn func(ctx context.Context)) worker {
|
func newWorkerFromFunc(fn func(ctx context.Context)) worker {
|
||||||
return &workerFromFunc{
|
return worker{
|
||||||
fn: fn,
|
fn: fn,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (w *workerFromFunc) Run(ctx context.Context) {
|
|
||||||
w.fn(ctx)
|
|
||||||
}
|
|
||||||
|
|
||||||
func startWorkers(c *cfg) {
|
func startWorkers(c *cfg) {
|
||||||
for _, wrk := range c.workers {
|
for _, wrk := range c.workers {
|
||||||
c.wg.Add(1)
|
startWorker(c, wrk)
|
||||||
|
|
||||||
go func(w worker) {
|
|
||||||
w.Run(c.ctx)
|
|
||||||
c.wg.Done()
|
|
||||||
}(wrk)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func startWorker(c *cfg, wrk worker) {
|
||||||
|
c.wg.Add(1)
|
||||||
|
|
||||||
|
go func(w worker) {
|
||||||
|
w.fn(c.ctx)
|
||||||
|
c.wg.Done()
|
||||||
|
}(wrk)
|
||||||
|
}
|
||||||
|
|
||||||
|
func delWorker(c *cfg, name string) {
|
||||||
|
for i, worker := range c.workers {
|
||||||
|
if worker.name == name {
|
||||||
|
c.workers = append(c.workers[:i], c.workers[i+1:]...)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func getWorker(c *cfg, name string) *worker {
|
||||||
|
for _, wrk := range c.workers {
|
||||||
|
if wrk.name == name {
|
||||||
|
return &wrk
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
|
@ -12,6 +12,7 @@ type (
|
||||||
MetricCollector struct {
|
MetricCollector struct {
|
||||||
next ServiceServer
|
next ServiceServer
|
||||||
metrics MetricRegister
|
metrics MetricRegister
|
||||||
|
enabled bool
|
||||||
}
|
}
|
||||||
|
|
||||||
getStreamMetric struct {
|
getStreamMetric struct {
|
||||||
|
@ -48,96 +49,125 @@ type (
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
|
|
||||||
func NewMetricCollector(next ServiceServer, register MetricRegister) *MetricCollector {
|
func NewMetricCollector(next ServiceServer, register MetricRegister, enabled bool) *MetricCollector {
|
||||||
return &MetricCollector{
|
return &MetricCollector{
|
||||||
next: next,
|
next: next,
|
||||||
metrics: register,
|
metrics: register,
|
||||||
|
enabled: enabled,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m MetricCollector) Get(req *object.GetRequest, stream GetObjectStream) (err error) {
|
func (m MetricCollector) Get(req *object.GetRequest, stream GetObjectStream) (err error) {
|
||||||
t := time.Now()
|
if m.enabled {
|
||||||
defer func() {
|
t := time.Now()
|
||||||
m.metrics.IncGetReqCounter(err == nil)
|
defer func() {
|
||||||
m.metrics.AddGetReqDuration(time.Since(t))
|
m.metrics.IncGetReqCounter(err == nil)
|
||||||
}()
|
m.metrics.AddGetReqDuration(time.Since(t))
|
||||||
|
}()
|
||||||
err = m.next.Get(req, &getStreamMetric{
|
err = m.next.Get(req, &getStreamMetric{
|
||||||
ServerStream: stream,
|
ServerStream: stream,
|
||||||
stream: stream,
|
stream: stream,
|
||||||
metrics: m.metrics,
|
metrics: m.metrics,
|
||||||
})
|
})
|
||||||
|
} else {
|
||||||
|
err = m.next.Get(req, stream)
|
||||||
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m MetricCollector) Put(ctx context.Context) (PutObjectStream, error) {
|
func (m MetricCollector) Put(ctx context.Context) (PutObjectStream, error) {
|
||||||
t := time.Now()
|
if m.enabled {
|
||||||
|
t := time.Now()
|
||||||
|
|
||||||
stream, err := m.next.Put(ctx)
|
stream, err := m.next.Put(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return &putStreamMetric{
|
||||||
|
stream: stream,
|
||||||
|
metrics: m.metrics,
|
||||||
|
start: t,
|
||||||
|
}, nil
|
||||||
}
|
}
|
||||||
|
return m.next.Put(ctx)
|
||||||
return &putStreamMetric{
|
|
||||||
stream: stream,
|
|
||||||
metrics: m.metrics,
|
|
||||||
start: t,
|
|
||||||
}, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m MetricCollector) Head(ctx context.Context, request *object.HeadRequest) (*object.HeadResponse, error) {
|
func (m MetricCollector) Head(ctx context.Context, request *object.HeadRequest) (*object.HeadResponse, error) {
|
||||||
t := time.Now()
|
if m.enabled {
|
||||||
|
t := time.Now()
|
||||||
|
|
||||||
res, err := m.next.Head(ctx, request)
|
res, err := m.next.Head(ctx, request)
|
||||||
|
|
||||||
m.metrics.IncHeadReqCounter(err == nil)
|
m.metrics.IncHeadReqCounter(err == nil)
|
||||||
m.metrics.AddHeadReqDuration(time.Since(t))
|
m.metrics.AddHeadReqDuration(time.Since(t))
|
||||||
|
|
||||||
return res, err
|
return res, err
|
||||||
|
}
|
||||||
|
return m.next.Head(ctx, request)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m MetricCollector) Search(req *object.SearchRequest, stream SearchStream) error {
|
func (m MetricCollector) Search(req *object.SearchRequest, stream SearchStream) error {
|
||||||
t := time.Now()
|
if m.enabled {
|
||||||
|
t := time.Now()
|
||||||
|
|
||||||
err := m.next.Search(req, stream)
|
err := m.next.Search(req, stream)
|
||||||
|
|
||||||
m.metrics.IncSearchReqCounter(err == nil)
|
m.metrics.IncSearchReqCounter(err == nil)
|
||||||
m.metrics.AddSearchReqDuration(time.Since(t))
|
m.metrics.AddSearchReqDuration(time.Since(t))
|
||||||
|
|
||||||
return err
|
return err
|
||||||
|
}
|
||||||
|
return m.next.Search(req, stream)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m MetricCollector) Delete(ctx context.Context, request *object.DeleteRequest) (*object.DeleteResponse, error) {
|
func (m MetricCollector) Delete(ctx context.Context, request *object.DeleteRequest) (*object.DeleteResponse, error) {
|
||||||
t := time.Now()
|
if m.enabled {
|
||||||
|
t := time.Now()
|
||||||
|
|
||||||
res, err := m.next.Delete(ctx, request)
|
res, err := m.next.Delete(ctx, request)
|
||||||
|
|
||||||
m.metrics.IncDeleteReqCounter(err == nil)
|
m.metrics.IncDeleteReqCounter(err == nil)
|
||||||
m.metrics.AddDeleteReqDuration(time.Since(t))
|
m.metrics.AddDeleteReqDuration(time.Since(t))
|
||||||
|
return res, err
|
||||||
return res, err
|
}
|
||||||
|
return m.next.Delete(ctx, request)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m MetricCollector) GetRange(req *object.GetRangeRequest, stream GetObjectRangeStream) error {
|
func (m MetricCollector) GetRange(req *object.GetRangeRequest, stream GetObjectRangeStream) error {
|
||||||
t := time.Now()
|
if m.enabled {
|
||||||
|
t := time.Now()
|
||||||
|
|
||||||
err := m.next.GetRange(req, stream)
|
err := m.next.GetRange(req, stream)
|
||||||
|
|
||||||
m.metrics.IncRangeReqCounter(err == nil)
|
m.metrics.IncRangeReqCounter(err == nil)
|
||||||
m.metrics.AddRangeReqDuration(time.Since(t))
|
m.metrics.AddRangeReqDuration(time.Since(t))
|
||||||
|
|
||||||
return err
|
return err
|
||||||
|
}
|
||||||
|
return m.next.GetRange(req, stream)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m MetricCollector) GetRangeHash(ctx context.Context, request *object.GetRangeHashRequest) (*object.GetRangeHashResponse, error) {
|
func (m MetricCollector) GetRangeHash(ctx context.Context, request *object.GetRangeHashRequest) (*object.GetRangeHashResponse, error) {
|
||||||
t := time.Now()
|
if m.enabled {
|
||||||
|
t := time.Now()
|
||||||
|
|
||||||
res, err := m.next.GetRangeHash(ctx, request)
|
res, err := m.next.GetRangeHash(ctx, request)
|
||||||
|
|
||||||
m.metrics.IncRangeHashReqCounter(err == nil)
|
m.metrics.IncRangeHashReqCounter(err == nil)
|
||||||
m.metrics.AddRangeHashReqDuration(time.Since(t))
|
m.metrics.AddRangeHashReqDuration(time.Since(t))
|
||||||
|
|
||||||
return res, err
|
return res, err
|
||||||
|
}
|
||||||
|
return m.next.GetRangeHash(ctx, request)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *MetricCollector) Enable() {
|
||||||
|
m.enabled = true
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *MetricCollector) Disable() {
|
||||||
|
m.enabled = false
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s getStreamMetric) Send(resp *object.GetResponse) error {
|
func (s getStreamMetric) Send(resp *object.GetResponse) error {
|
||||||
|
|
|
@ -6,12 +6,12 @@ import (
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Prm groups the required parameters of the Server's constructor.
|
// HTTPSrvPrm groups the required parameters of the Server's constructor.
|
||||||
//
|
//
|
||||||
// All values must comply with the requirements imposed on them.
|
// All values must comply with the requirements imposed on them.
|
||||||
// Passing incorrect parameter values will result in constructor
|
// Passing incorrect parameter values will result in constructor
|
||||||
// failure (error or panic depending on the implementation).
|
// failure (error or panic depending on the implementation).
|
||||||
type Prm struct {
|
type HTTPSrvPrm struct {
|
||||||
// TCP address for the server to listen on.
|
// TCP address for the server to listen on.
|
||||||
//
|
//
|
||||||
// Must be a valid TCP address.
|
// Must be a valid TCP address.
|
||||||
|
@ -49,6 +49,15 @@ func panicOnValue(t, n string, v interface{}) {
|
||||||
panic(fmt.Sprintf(invalidValFmt, t, n, v, v))
|
panic(fmt.Sprintf(invalidValFmt, t, n, v, v))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func checkSrvPrm(addr string, handler http.Handler) {
|
||||||
|
switch {
|
||||||
|
case addr == "":
|
||||||
|
panicOnPrmValue("Address", addr)
|
||||||
|
case handler == nil:
|
||||||
|
panicOnPrmValue("Handler", handler)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// New creates a new instance of the Server.
|
// New creates a new instance of the Server.
|
||||||
//
|
//
|
||||||
// Panics if at least one value of the parameters is invalid.
|
// Panics if at least one value of the parameters is invalid.
|
||||||
|
@ -58,13 +67,8 @@ func panicOnValue(t, n string, v interface{}) {
|
||||||
//
|
//
|
||||||
// The created Server does not require additional
|
// The created Server does not require additional
|
||||||
// initialization and is completely ready for work.
|
// initialization and is completely ready for work.
|
||||||
func New(prm Prm, opts ...Option) *Server {
|
func New(prm HTTPSrvPrm, opts ...Option) *Server {
|
||||||
switch {
|
checkSrvPrm(prm.Address, prm.Handler)
|
||||||
case prm.Address == "":
|
|
||||||
panicOnPrmValue("Address", prm.Address)
|
|
||||||
case prm.Handler == nil:
|
|
||||||
panicOnPrmValue("Handler", prm.Handler)
|
|
||||||
}
|
|
||||||
|
|
||||||
c := defaultCfg()
|
c := defaultCfg()
|
||||||
|
|
||||||
|
@ -85,3 +89,14 @@ func New(prm Prm, opts ...Option) *Server {
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// NewHTTPSrvPrm creates a new instance of the HTTPSrvPrm.
|
||||||
|
//
|
||||||
|
// Panics if at least one value of the parameters is invalid.
|
||||||
|
func NewHTTPSrvPrm(addr string, handler http.Handler) *HTTPSrvPrm {
|
||||||
|
checkSrvPrm(addr, handler)
|
||||||
|
return &HTTPSrvPrm{
|
||||||
|
Address: addr,
|
||||||
|
Handler: handler,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in a new issue