Move changes from the support branch. #58

Merged
fyrchik merged 25 commits from move-changes into master 2023-02-20 10:53:28 +00:00
12 changed files with 345 additions and 172 deletions
Showing only changes of commit 50e35de457 - Show all commits

View file

@ -10,6 +10,7 @@ Changelog for FrostFS Node
- New `frostfs_node_object_payload_size` metric for tracking size of reqular objects on a single shard (#1794)
- Add command `frostfs-adm morph netmap-candidates` (#1889)
- `object.delete.tombstone_lifetime` config parameter to set tombstone lifetime in the DELETE service (#2246)
- Reload config for pprof and metrics on SIGHUP in `neofs-node` (#1868)
### Changed
- Change `frostfs_node_engine_container_size` to counting sizes of logical objects

View file

@ -126,7 +126,7 @@ func initHTTPServers(cfg *viper.Viper, log *logger.Logger) []*httputil.Server {
addr := cfg.GetString(item.cfgPrefix + ".address")
var prm httputil.Prm
var prm httputil.HTTPSrvPrm
prm.Address = addr
prm.Handler = item.handler()

View file

@ -0,0 +1,25 @@
package main
type closer struct {
name string
fn func()
}
func getCloser(c *cfg, name string) *closer {
for _, clsr := range c.closers {
if clsr.name == name {
return &clsr
}
}
return nil
}
func delCloser(c *cfg, name string) {
for i, clsr := range c.closers {
if clsr.name == name {
c.closers[i] = c.closers[len(c.closers)-1]
c.closers = c.closers[:len(c.closers)-1]
return
}
}
}

View file

@ -24,7 +24,6 @@ import (
blobovniczaconfig "github.com/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine/shard/blobstor/blobovnicza"
fstreeconfig "github.com/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine/shard/blobstor/fstree"
loggerconfig "github.com/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/logger"
metricsconfig "github.com/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/metrics"
nodeconfig "github.com/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/node"
objectconfig "github.com/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/object"
replicatorconfig "github.com/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/replicator"
@ -48,6 +47,7 @@ import (
"github.com/TrueCloudLab/frostfs-node/pkg/network"
"github.com/TrueCloudLab/frostfs-node/pkg/network/cache"
"github.com/TrueCloudLab/frostfs-node/pkg/services/control"
objectService "github.com/TrueCloudLab/frostfs-node/pkg/services/object"
getsvc "github.com/TrueCloudLab/frostfs-node/pkg/services/object/get"
"github.com/TrueCloudLab/frostfs-node/pkg/services/object_manager/tombstone"
tsourse "github.com/TrueCloudLab/frostfs-node/pkg/services/object_manager/tombstone/source"
@ -308,7 +308,7 @@ type internals struct {
wg *sync.WaitGroup
workers []worker
closers []func()
closers []closer
apiVersion version.Version
healthStatus *atomic.Int32
@ -364,12 +364,16 @@ type shared struct {
treeService *tree.Service
metricsCollector *metrics.NodeMetrics
metricsSvc *objectService.MetricCollector
}
// dynamicConfiguration stores parameters of the
// components that supports runtime reconfigurations.
type dynamicConfiguration struct {
logger *logger.Prm
logger *logger.Prm
pprof *httpComponent
metrics *httpComponent
}
type cfg struct {
@ -612,10 +616,8 @@ func initCfg(appCfg *config.Config) *cfg {
user.IDFromKey(&c.ownerIDFromKey, key.PrivateKey.PublicKey)
if metricsconfig.Enabled(c.appCfg) {
c.metricsCollector = metrics.NewNodeMetrics()
netState.metrics = c.metricsCollector
}
c.metricsCollector = metrics.NewNodeMetrics()
netState.metrics = c.metricsCollector
c.onShutdown(c.clientCache.CloseAll) // clean up connections
c.onShutdown(c.bgClientCache.CloseAll) // clean up connections
@ -915,11 +917,9 @@ func (c *cfg) ObjectServiceLoad() float64 {
return float64(c.cfgObject.pool.putRemote.Running()) / float64(c.cfgObject.pool.putRemoteCapacity)
}
type dCfg struct {
name string
cfg interface {
Reload() error
}
type dCmp struct {
name string
reloadFunc func() error
}
func (c *cfg) signalWatcher() {
@ -964,7 +964,7 @@ func (c *cfg) reloadConfig() {
// all the components are expected to support
// Logger's dynamic reconfiguration approach
var components []dCfg
var components []dCmp
// Logger
@ -974,7 +974,18 @@ func (c *cfg) reloadConfig() {
return
}
components = append(components, dCfg{name: "logger", cfg: logPrm})
components = append(components, dCmp{"logger", logPrm.Reload})
if cmp, updated := metricsComponent(c); updated {
if cmp.enabled {
cmp.preReload = enableMetricsSvc
} else {
cmp.preReload = disableMetricsSvc
}
components = append(components, dCmp{cmp.name, cmp.reload})
}
if cmp, updated := pprofComponent(c); updated {
components = append(components, dCmp{cmp.name, cmp.reload})
}
// Storage Engine
@ -990,7 +1001,7 @@ func (c *cfg) reloadConfig() {
}
for _, component := range components {
err = component.cfg.Reload()
err = component.reloadFunc()
if err != nil {
c.log.Error("updated configuration applying",
zap.String("component", component.name),
@ -1006,7 +1017,7 @@ func (c *cfg) shutdown() {
c.ctxCancel()
for i := range c.closers {
c.closers[len(c.closers)-1-i]()
c.closers[len(c.closers)-1-i].fn()
}
close(c.internalErr)
}

View file

@ -0,0 +1,70 @@
package main
import (
"context"
"fmt"
"net/http"
"time"
httputil "github.com/TrueCloudLab/frostfs-node/pkg/util/http"
)
type httpComponent struct {
address string
name string
handler http.Handler
shutdownDur time.Duration
enabled bool
cfg *cfg
preReload func(c *cfg)
}
func (cmp *httpComponent) init(c *cfg) {
if !cmp.enabled {
c.log.Info(fmt.Sprintf("%s is disabled", cmp.name))
return
}
// Init server with parameters
srv := httputil.New(
*httputil.NewHTTPSrvPrm(
cmp.address,
cmp.handler,
),
httputil.WithShutdownTimeout(
cmp.shutdownDur,
),
)
c.closers = append(c.closers, closer{
cmp.name,
func() { stopAndLog(c, cmp.name, srv.Shutdown) },
})
c.workers = append(c.workers, worker{
cmp.name,
func(ctx context.Context) {
runAndLog(c, cmp.name, false, func(c *cfg) {
fatalOnErr(srv.Serve())
})
},
})
}
func (cmp *httpComponent) reload() error {
if cmp.preReload != nil {
cmp.preReload(cmp.cfg)
}
// Shutdown server
closer := getCloser(cmp.cfg, cmp.name)
if closer != nil {
closer.fn()
}
// Cleanup
delCloser(cmp.cfg, cmp.name)
delWorker(cmp.cfg, cmp.name)
// Init server with new parameters
cmp.init(cmp.cfg)
// Start worker
if cmp.enabled {
startWorker(cmp.cfg, *getWorker(cmp.cfg, cmp.name))
}
return nil
}

View file

@ -81,8 +81,10 @@ func initApp(c *cfg) {
c.wg.Done()
}()
initAndLog(c, "pprof", initProfiler)
initAndLog(c, "prometheus", initMetrics)
pprof, _ := pprofComponent(c)
metrics, _ := metricsComponent(c)
initAndLog(c, pprof.name, pprof.init)
initAndLog(c, metrics.name, metrics.init)
initLocalStorage(c)
@ -114,6 +116,19 @@ func runAndLog(c *cfg, name string, logSuccess bool, starter func(*cfg)) {
}
}
func stopAndLog(c *cfg, name string, stopper func() error) {
c.log.Debug(fmt.Sprintf("shutting down %s service", name))
err := stopper()
if err != nil {
c.log.Debug(fmt.Sprintf("could not shutdown %s server", name),
zap.String("error", err.Error()),
)
}
c.log.Debug(fmt.Sprintf("%s service has been stopped", name))
}
func bootUp(c *cfg) {
runAndLog(c, "NATS", true, connectNats)
runAndLog(c, "gRPC", false, serveGRPC)
@ -135,5 +150,5 @@ func wait(c *cfg) {
}
func (c *cfg) onShutdown(f func()) {
c.closers = append(c.closers, f)
c.closers = append(c.closers, closer{"", f})
}

View file

@ -1,47 +1,45 @@
package main
import (
"context"
metricsconfig "github.com/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/metrics"
httputil "github.com/TrueCloudLab/frostfs-node/pkg/util/http"
"github.com/prometheus/client_golang/prometheus/promhttp"
"go.uber.org/zap"
)
func initMetrics(c *cfg) {
if !metricsconfig.Enabled(c.appCfg) {
c.log.Info("prometheus is disabled")
return
func metricsComponent(c *cfg) (*httpComponent, bool) {
var updated bool
// check if it has been inited before
if c.dynamicConfiguration.metrics == nil {
c.dynamicConfiguration.metrics = new(httpComponent)
c.dynamicConfiguration.metrics.cfg = c
c.dynamicConfiguration.metrics.name = "metrics"
c.dynamicConfiguration.metrics.handler = promhttp.Handler()
updated = true
}
var prm httputil.Prm
// (re)init read configuration
enabled := metricsconfig.Enabled(c.appCfg)
if enabled != c.dynamicConfiguration.metrics.enabled {
c.dynamicConfiguration.metrics.enabled = enabled
updated = true
}
address := metricsconfig.Address(c.appCfg)
if address != c.dynamicConfiguration.metrics.address {
c.dynamicConfiguration.metrics.address = address
updated = true
}
dur := metricsconfig.ShutdownTimeout(c.appCfg)
if dur != c.dynamicConfiguration.metrics.shutdownDur {
c.dynamicConfiguration.metrics.shutdownDur = dur
updated = true
}
prm.Address = metricsconfig.Address(c.appCfg)
prm.Handler = promhttp.Handler()
srv := httputil.New(prm,
httputil.WithShutdownTimeout(
metricsconfig.ShutdownTimeout(c.appCfg),
),
)
c.workers = append(c.workers, newWorkerFromFunc(func(context.Context) {
runAndLog(c, "metrics", false, func(c *cfg) {
fatalOnErr(srv.Serve())
})
}))
c.closers = append(c.closers, func() {
c.log.Debug("shutting down metrics service")
err := srv.Shutdown()
if err != nil {
c.log.Debug("could not shutdown metrics server",
zap.String("error", err.Error()),
)
}
c.log.Debug("metrics service has been stopped")
})
return c.dynamicConfiguration.metrics, updated
}
func enableMetricsSvc(c *cfg) {
c.shared.metricsSvc.Enable()
}
func disableMetricsSvc(c *cfg) {
c.shared.metricsSvc.Disable()
}

View file

@ -8,6 +8,7 @@ import (
"github.com/TrueCloudLab/frostfs-api-go/v2/object"
objectGRPC "github.com/TrueCloudLab/frostfs-api-go/v2/object/grpc"
metricsconfig "github.com/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/metrics"
policerconfig "github.com/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/policer"
replicatorconfig "github.com/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/replicator"
coreclient "github.com/TrueCloudLab/frostfs-node/pkg/core/client"
@ -246,7 +247,11 @@ func initObjectService(c *cfg) {
traverseGen := util.NewTraverserGenerator(c.netMapSource, c.cfgObject.cnrSource, c)
c.workers = append(c.workers, pol)
c.workers = append(c.workers, worker{
fn: func(ctx context.Context) {
pol.Run(ctx)
},
})
var os putsvc.ObjectStorage = engineWithoutNotifications{
engine: ls,
@ -380,12 +385,9 @@ func initObjectService(c *cfg) {
respSvc,
)
var firstSvc objectService.ServiceServer = signSvc
if c.metricsCollector != nil {
firstSvc = objectService.NewMetricCollector(signSvc, c.metricsCollector)
}
server := objectTransportGRPC.New(firstSvc)
c.shared.metricsSvc = objectService.NewMetricCollector(
signSvc, c.metricsCollector, metricsconfig.Enabled(c.appCfg))
server := objectTransportGRPC.New(c.shared.metricsSvc)
for _, srv := range c.cfgGRPC.servers {
objectGRPC.RegisterObjectServiceServer(srv, server)

View file

@ -1,46 +1,37 @@
package main
import (
"context"
profilerconfig "github.com/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/profiler"
httputil "github.com/TrueCloudLab/frostfs-node/pkg/util/http"
"go.uber.org/zap"
)
func initProfiler(c *cfg) {
if !profilerconfig.Enabled(c.appCfg) {
c.log.Info("pprof is disabled")
return
func pprofComponent(c *cfg) (*httpComponent, bool) {
var updated bool
// check if it has been inited before
if c.dynamicConfiguration.pprof == nil {
c.dynamicConfiguration.pprof = new(httpComponent)
c.dynamicConfiguration.pprof.cfg = c
c.dynamicConfiguration.pprof.name = "pprof"
c.dynamicConfiguration.pprof.handler = httputil.Handler()
updated = true
}
var prm httputil.Prm
// (re)init read configuration
enabled := profilerconfig.Enabled(c.appCfg)
if enabled != c.dynamicConfiguration.pprof.enabled {
c.dynamicConfiguration.pprof.enabled = enabled
updated = true
}
address := profilerconfig.Address(c.appCfg)
if address != c.dynamicConfiguration.pprof.address {
c.dynamicConfiguration.pprof.address = address
updated = true
}
dur := profilerconfig.ShutdownTimeout(c.appCfg)
if dur != c.dynamicConfiguration.pprof.shutdownDur {
c.dynamicConfiguration.pprof.shutdownDur = dur
updated = true
}
prm.Address = profilerconfig.Address(c.appCfg)
prm.Handler = httputil.Handler()
srv := httputil.New(prm,
httputil.WithShutdownTimeout(
profilerconfig.ShutdownTimeout(c.appCfg),
),
)
c.workers = append(c.workers, newWorkerFromFunc(func(context.Context) {
runAndLog(c, "profiler", false, func(c *cfg) {
fatalOnErr(srv.Serve())
})
}))
c.closers = append(c.closers, func() {
c.log.Debug("shutting down profiling service")
err := srv.Shutdown()
if err != nil {
c.log.Debug("could not shutdown pprof server",
zap.String("error", err.Error()),
)
}
c.log.Debug("profiling service has been stopped")
})
return c.dynamicConfiguration.pprof, updated
}

View file

@ -4,31 +4,46 @@ import (
"context"
)
type worker interface {
Run(context.Context)
}
type workerFromFunc struct {
fn func(context.Context)
type worker struct {
name string
fn func(context.Context)
}
func newWorkerFromFunc(fn func(ctx context.Context)) worker {
return &workerFromFunc{
return worker{
fn: fn,
}
}
func (w *workerFromFunc) Run(ctx context.Context) {
w.fn(ctx)
}
func startWorkers(c *cfg) {
for _, wrk := range c.workers {
c.wg.Add(1)
go func(w worker) {
w.Run(c.ctx)
c.wg.Done()
}(wrk)
startWorker(c, wrk)
}
}
func startWorker(c *cfg, wrk worker) {
c.wg.Add(1)
go func(w worker) {
w.fn(c.ctx)
c.wg.Done()
}(wrk)
}
func delWorker(c *cfg, name string) {
for i, worker := range c.workers {
if worker.name == name {
c.workers = append(c.workers[:i], c.workers[i+1:]...)
return
}
}
}
func getWorker(c *cfg, name string) *worker {
for _, wrk := range c.workers {
if wrk.name == name {
return &wrk
}
}
return nil
}

View file

@ -12,6 +12,7 @@ type (
MetricCollector struct {
next ServiceServer
metrics MetricRegister
enabled bool
}
getStreamMetric struct {
@ -48,96 +49,125 @@ type (
}
)
func NewMetricCollector(next ServiceServer, register MetricRegister) *MetricCollector {
func NewMetricCollector(next ServiceServer, register MetricRegister, enabled bool) *MetricCollector {
return &MetricCollector{
next: next,
metrics: register,
enabled: enabled,
}
}
func (m MetricCollector) Get(req *object.GetRequest, stream GetObjectStream) (err error) {
t := time.Now()
defer func() {
m.metrics.IncGetReqCounter(err == nil)
m.metrics.AddGetReqDuration(time.Since(t))
}()
err = m.next.Get(req, &getStreamMetric{
ServerStream: stream,
stream: stream,
metrics: m.metrics,
})
if m.enabled {
t := time.Now()
defer func() {
m.metrics.IncGetReqCounter(err == nil)
m.metrics.AddGetReqDuration(time.Since(t))
}()
err = m.next.Get(req, &getStreamMetric{
ServerStream: stream,
stream: stream,
metrics: m.metrics,
})
} else {
err = m.next.Get(req, stream)
}
return
}
func (m MetricCollector) Put(ctx context.Context) (PutObjectStream, error) {
t := time.Now()
if m.enabled {
t := time.Now()
stream, err := m.next.Put(ctx)
if err != nil {
return nil, err
stream, err := m.next.Put(ctx)
if err != nil {
return nil, err
}
return &putStreamMetric{
stream: stream,
metrics: m.metrics,
start: t,
}, nil
}
return &putStreamMetric{
stream: stream,
metrics: m.metrics,
start: t,
}, nil
return m.next.Put(ctx)
}
func (m MetricCollector) Head(ctx context.Context, request *object.HeadRequest) (*object.HeadResponse, error) {
t := time.Now()
if m.enabled {
t := time.Now()
res, err := m.next.Head(ctx, request)
res, err := m.next.Head(ctx, request)
m.metrics.IncHeadReqCounter(err == nil)
m.metrics.AddHeadReqDuration(time.Since(t))
m.metrics.IncHeadReqCounter(err == nil)
m.metrics.AddHeadReqDuration(time.Since(t))
return res, err
return res, err
}
return m.next.Head(ctx, request)
}
func (m MetricCollector) Search(req *object.SearchRequest, stream SearchStream) error {
t := time.Now()
if m.enabled {
t := time.Now()
err := m.next.Search(req, stream)
err := m.next.Search(req, stream)
m.metrics.IncSearchReqCounter(err == nil)
m.metrics.AddSearchReqDuration(time.Since(t))
m.metrics.IncSearchReqCounter(err == nil)
m.metrics.AddSearchReqDuration(time.Since(t))
return err
return err
}
return m.next.Search(req, stream)
}
func (m MetricCollector) Delete(ctx context.Context, request *object.DeleteRequest) (*object.DeleteResponse, error) {
t := time.Now()
if m.enabled {
t := time.Now()
res, err := m.next.Delete(ctx, request)
res, err := m.next.Delete(ctx, request)
m.metrics.IncDeleteReqCounter(err == nil)
m.metrics.AddDeleteReqDuration(time.Since(t))
return res, err
m.metrics.IncDeleteReqCounter(err == nil)
m.metrics.AddDeleteReqDuration(time.Since(t))
return res, err
}
return m.next.Delete(ctx, request)
}
func (m MetricCollector) GetRange(req *object.GetRangeRequest, stream GetObjectRangeStream) error {
t := time.Now()
if m.enabled {
t := time.Now()
err := m.next.GetRange(req, stream)
err := m.next.GetRange(req, stream)
m.metrics.IncRangeReqCounter(err == nil)
m.metrics.AddRangeReqDuration(time.Since(t))
m.metrics.IncRangeReqCounter(err == nil)
m.metrics.AddRangeReqDuration(time.Since(t))
return err
return err
}
return m.next.GetRange(req, stream)
}
func (m MetricCollector) GetRangeHash(ctx context.Context, request *object.GetRangeHashRequest) (*object.GetRangeHashResponse, error) {
t := time.Now()
if m.enabled {
t := time.Now()
res, err := m.next.GetRangeHash(ctx, request)
res, err := m.next.GetRangeHash(ctx, request)
m.metrics.IncRangeHashReqCounter(err == nil)
m.metrics.AddRangeHashReqDuration(time.Since(t))
m.metrics.IncRangeHashReqCounter(err == nil)
m.metrics.AddRangeHashReqDuration(time.Since(t))
return res, err
return res, err
}
return m.next.GetRangeHash(ctx, request)
}
func (m *MetricCollector) Enable() {
m.enabled = true
}
func (m *MetricCollector) Disable() {
m.enabled = false
}
func (s getStreamMetric) Send(resp *object.GetResponse) error {

View file

@ -6,12 +6,12 @@ import (
"time"
)
// Prm groups the required parameters of the Server's constructor.
// HTTPSrvPrm groups the required parameters of the Server's constructor.
//
// All values must comply with the requirements imposed on them.
// Passing incorrect parameter values will result in constructor
// failure (error or panic depending on the implementation).
type Prm struct {
type HTTPSrvPrm struct {
// TCP address for the server to listen on.
//
// Must be a valid TCP address.
@ -49,6 +49,15 @@ func panicOnValue(t, n string, v interface{}) {
panic(fmt.Sprintf(invalidValFmt, t, n, v, v))
}
func checkSrvPrm(addr string, handler http.Handler) {
switch {
case addr == "":
panicOnPrmValue("Address", addr)
case handler == nil:
panicOnPrmValue("Handler", handler)
}
}
// New creates a new instance of the Server.
//
// Panics if at least one value of the parameters is invalid.
@ -58,13 +67,8 @@ func panicOnValue(t, n string, v interface{}) {
//
// The created Server does not require additional
// initialization and is completely ready for work.
func New(prm Prm, opts ...Option) *Server {
switch {
case prm.Address == "":
panicOnPrmValue("Address", prm.Address)
case prm.Handler == nil:
panicOnPrmValue("Handler", prm.Handler)
}
func New(prm HTTPSrvPrm, opts ...Option) *Server {
checkSrvPrm(prm.Address, prm.Handler)
c := defaultCfg()
@ -85,3 +89,14 @@ func New(prm Prm, opts ...Option) *Server {
},
}
}
// NewHTTPSrvPrm creates a new instance of the HTTPSrvPrm.
//
// Panics if at least one value of the parameters is invalid.
func NewHTTPSrvPrm(addr string, handler http.Handler) *HTTPSrvPrm {
checkSrvPrm(addr, handler)
return &HTTPSrvPrm{
Address: addr,
Handler: handler,
}
}