[#200] Reload resolvers on SIGHUP

Signed-off-by: Denis Kirillov <denis@nspcc.ru>
This commit is contained in:
Denis Kirillov 2022-09-08 19:00:22 +03:00 committed by Kirillov Denis
parent 939f5f0c65
commit 1e05d8a935
3 changed files with 116 additions and 55 deletions

32
app.go
View file

@ -158,6 +158,21 @@ func newApp(ctx context.Context, opt ...Option) App {
a.log.Fatal("failed to dial pool", zap.Error(err)) a.log.Fatal("failed to dial pool", zap.Error(err))
} }
a.initResolver()
a.initMetrics()
return a
}
func (a *app) initResolver() {
var err error
a.resolver, err = resolver.NewContainerResolver(a.getResolverConfig())
if err != nil {
a.log.Fatal("failed to create resolver", zap.Error(err))
}
}
func (a *app) getResolverConfig() ([]string, *resolver.Config) {
resolveCfg := &resolver.Config{ resolveCfg := &resolver.Config{
NeoFS: resolver.NewNeoFSResolver(a.pool), NeoFS: resolver.NewNeoFSResolver(a.pool),
RPCAddress: a.cfg.GetString(cfgRPCEndpoint), RPCAddress: a.cfg.GetString(cfgRPCEndpoint),
@ -169,18 +184,11 @@ func newApp(ctx context.Context, opt ...Option) App {
a.log.Warn(fmt.Sprintf("resolver '%s' won't be used since '%s' isn't provided", resolver.NNSResolver, cfgRPCEndpoint)) a.log.Warn(fmt.Sprintf("resolver '%s' won't be used since '%s' isn't provided", resolver.NNSResolver, cfgRPCEndpoint))
} }
if len(order) != 0 { if len(order) == 0 {
a.resolver, err = resolver.NewResolver(order, resolveCfg) a.log.Info("container resolver will be disabled because of resolvers 'resolver_order' is empty")
if err != nil {
a.log.Fatal("failed to create resolver", zap.Error(err))
}
} else {
a.log.Info("container resolver is disabled")
} }
a.initMetrics() return order, resolveCfg
return a
} }
func (a *app) initMetrics() { func (a *app) initMetrics() {
@ -365,6 +373,10 @@ func (a *app) configReload() {
a.logLevel.SetLevel(lvl) a.logLevel.SetLevel(lvl)
} }
if err := a.resolver.UpdateResolvers(a.getResolverConfig()); err != nil {
a.log.Warn("failed to update resolvers", zap.Error(err))
}
a.stopServices() a.stopServices()
a.startServices() a.startServices()

View file

@ -2,7 +2,9 @@ package resolver
import ( import (
"context" "context"
"errors"
"fmt" "fmt"
"sync"
cid "github.com/nspcc-dev/neofs-sdk-go/container/id" cid "github.com/nspcc-dev/neofs-sdk-go/container/id"
"github.com/nspcc-dev/neofs-sdk-go/ns" "github.com/nspcc-dev/neofs-sdk-go/ns"
@ -13,6 +15,9 @@ const (
DNSResolver = "dns" DNSResolver = "dns"
) )
// ErrNoResolvers returns when trying to resolve container without any resolver.
var ErrNoResolvers = errors.New("no resolvers")
// NeoFS represents virtual connection to the NeoFS network. // NeoFS represents virtual connection to the NeoFS network.
type NeoFS interface { type NeoFS interface {
// SystemDNS reads system DNS network parameters of the NeoFS. // SystemDNS reads system DNS network parameters of the NeoFS.
@ -28,66 +33,116 @@ type Config struct {
} }
type ContainerResolver struct { type ContainerResolver struct {
Name string mu sync.RWMutex
resolve func(context.Context, string) (*cid.ID, error) resolvers []*Resolver
next *ContainerResolver
} }
func (r *ContainerResolver) SetResolveFunc(fn func(context.Context, string) (*cid.ID, error)) { type Resolver struct {
Name string
resolve func(context.Context, string) (*cid.ID, error)
}
func (r *Resolver) SetResolveFunc(fn func(context.Context, string) (*cid.ID, error)) {
r.resolve = fn r.resolve = fn
} }
func (r *ContainerResolver) Resolve(ctx context.Context, name string) (*cid.ID, error) { func (r *Resolver) Resolve(ctx context.Context, name string) (*cid.ID, error) {
cnrID, err := r.resolve(ctx, name) return r.resolve(ctx, name)
}
func NewContainerResolver(resolverNames []string, cfg *Config) (*ContainerResolver, error) {
resolvers, err := createResolvers(resolverNames, cfg)
if err != nil { if err != nil {
if r.next != nil { return nil, err
cnrID, inErr := r.next.Resolve(ctx, name) }
if inErr != nil {
return nil, fmt.Errorf("%s; %w", err.Error(), inErr) return &ContainerResolver{
resolvers: resolvers,
}, nil
}
func createResolvers(resolverNames []string, cfg *Config) ([]*Resolver, error) {
resolvers := make([]*Resolver, len(resolverNames))
for i, name := range resolverNames {
cnrResolver, err := newResolver(name, cfg)
if err != nil {
return nil, err
}
resolvers[i] = cnrResolver
}
return resolvers, nil
}
func (r *ContainerResolver) Resolve(ctx context.Context, cnrName string) (*cid.ID, error) {
r.mu.RLock()
defer r.mu.RUnlock()
var err error
for _, resolver := range r.resolvers {
cnrID, resolverErr := resolver.Resolve(ctx, cnrName)
if resolverErr != nil {
resolverErr = fmt.Errorf("%s: %w", resolver.Name, resolverErr)
if err == nil {
err = resolverErr
} else {
err = fmt.Errorf("%s: %w", err.Error(), resolverErr)
}
continue
} }
return cnrID, nil return cnrID, nil
} }
return nil, err
}
return cnrID, nil
}
func NewResolver(order []string, cfg *Config) (*ContainerResolver, error) {
if len(order) == 0 {
return nil, fmt.Errorf("resolving order must not be empty")
}
bucketResolver, err := newResolver(order[len(order)-1], cfg, nil)
if err != nil { if err != nil {
return nil, err return nil, err
} }
for i := len(order) - 2; i >= 0; i-- { return nil, ErrNoResolvers
resolverName := order[i]
next := bucketResolver
bucketResolver, err = newResolver(resolverName, cfg, next)
if err != nil {
return nil, err
}
}
return bucketResolver, nil
} }
func newResolver(name string, cfg *Config, next *ContainerResolver) (*ContainerResolver, error) { func (r *ContainerResolver) UpdateResolvers(resolverNames []string, cfg *Config) error {
r.mu.Lock()
defer r.mu.Unlock()
if r.equals(resolverNames) {
return nil
}
resolvers, err := createResolvers(resolverNames, cfg)
if err != nil {
return err
}
r.resolvers = resolvers
return nil
}
func (r *ContainerResolver) equals(resolverNames []string) bool {
if len(r.resolvers) != len(resolverNames) {
return false
}
for i := 0; i < len(resolverNames); i++ {
if r.resolvers[i].Name != resolverNames[i] {
return false
}
}
return true
}
func newResolver(name string, cfg *Config) (*Resolver, error) {
switch name { switch name {
case DNSResolver: case DNSResolver:
return NewDNSResolver(cfg.NeoFS, next) return NewDNSResolver(cfg.NeoFS)
case NNSResolver: case NNSResolver:
return NewNNSResolver(cfg.RPCAddress, next) return NewNNSResolver(cfg.RPCAddress)
default: default:
return nil, fmt.Errorf("unknown resolver: %s", name) return nil, fmt.Errorf("unknown resolver: %s", name)
} }
} }
func NewDNSResolver(neoFS NeoFS, next *ContainerResolver) (*ContainerResolver, error) { func NewDNSResolver(neoFS NeoFS) (*Resolver, error) {
if neoFS == nil { if neoFS == nil {
return nil, fmt.Errorf("pool must not be nil for DNS resolver") return nil, fmt.Errorf("pool must not be nil for DNS resolver")
} }
@ -108,15 +163,13 @@ func NewDNSResolver(neoFS NeoFS, next *ContainerResolver) (*ContainerResolver, e
return &cnrID, nil return &cnrID, nil
} }
return &ContainerResolver{ return &Resolver{
Name: DNSResolver, Name: DNSResolver,
resolve: resolveFunc, resolve: resolveFunc,
next: next,
}, nil }, nil
} }
func NewNNSResolver(rpcAddress string, next *ContainerResolver) (*ContainerResolver, error) { func NewNNSResolver(rpcAddress string) (*Resolver, error) {
var nns ns.NNS var nns ns.NNS
if err := nns.Dial(rpcAddress); err != nil { if err := nns.Dial(rpcAddress); err != nil {
@ -131,10 +184,8 @@ func NewNNSResolver(rpcAddress string, next *ContainerResolver) (*ContainerResol
return &cnrID, nil return &cnrID, nil
} }
return &ContainerResolver{ return &Resolver{
Name: NNSResolver, Name: NNSResolver,
resolve: resolveFunc, resolve: resolveFunc,
next: next,
}, nil }, nil
} }

View file

@ -13,9 +13,7 @@ func GetContainerID(ctx context.Context, containerID string, resolver *resolver.
cnrID := new(cid.ID) cnrID := new(cid.ID)
err := cnrID.DecodeString(containerID) err := cnrID.DecodeString(containerID)
if err != nil { if err != nil {
if resolver != nil {
cnrID, err = resolver.Resolve(ctx, containerID) cnrID, err = resolver.Resolve(ctx, containerID)
} }
}
return cnrID, err return cnrID, err
} }