forked from TrueCloudLab/frostfs-http-gw
[#160] Add internal/net package with multinet dialer source
Signed-off-by: Alex Vanin <a.vanin@yadro.com>
This commit is contained in:
parent
46c63edd67
commit
69b7761bd6
8 changed files with 261 additions and 1 deletions
3
go.mod
3
go.mod
|
@ -6,6 +6,7 @@ require (
|
|||
git.frostfs.info/TrueCloudLab/frostfs-api-go/v2 v2.16.1-0.20240916093537-13fa0da3741e
|
||||
git.frostfs.info/TrueCloudLab/frostfs-observability v0.0.0-20240909114314-666d326cc573
|
||||
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20240918095938-e580ee991d98
|
||||
git.frostfs.info/TrueCloudLab/multinet v0.0.0-20241015075604-6cb0d80e0972
|
||||
git.frostfs.info/TrueCloudLab/zapjournald v0.0.0-20240124114243-cb2e66427d02
|
||||
github.com/bluele/gcache v0.0.2
|
||||
github.com/docker/go-units v0.4.0
|
||||
|
@ -25,6 +26,7 @@ require (
|
|||
go.uber.org/zap v1.27.0
|
||||
golang.org/x/exp v0.0.0-20240222234643-814bf88cf225
|
||||
golang.org/x/net v0.26.0
|
||||
golang.org/x/sys v0.22.0
|
||||
google.golang.org/grpc v1.66.2
|
||||
)
|
||||
|
||||
|
@ -106,7 +108,6 @@ require (
|
|||
go.uber.org/multierr v1.11.0 // indirect
|
||||
golang.org/x/crypto v0.24.0 // indirect
|
||||
golang.org/x/sync v0.7.0 // indirect
|
||||
golang.org/x/sys v0.22.0 // indirect
|
||||
golang.org/x/term v0.21.0 // indirect
|
||||
golang.org/x/text v0.16.0 // indirect
|
||||
golang.org/x/time v0.3.0 // indirect
|
||||
|
|
2
go.sum
2
go.sum
|
@ -49,6 +49,8 @@ git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20240918095938-e580ee991d98
|
|||
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20240918095938-e580ee991d98/go.mod h1:GeNpo12HcEW4J412sH5yf8xFYapxlrt5fcYzRwg0Ino=
|
||||
git.frostfs.info/TrueCloudLab/hrw v1.2.1 h1:ccBRK21rFvY5R1WotI6LNoPlizk7qSvdfD8lNIRudVc=
|
||||
git.frostfs.info/TrueCloudLab/hrw v1.2.1/go.mod h1:C1Ygde2n843yTZEQ0FP69jYiuaYV0kriLvP4zm8JuvM=
|
||||
git.frostfs.info/TrueCloudLab/multinet v0.0.0-20241015075604-6cb0d80e0972 h1:/960fWeyn2AFHwQUwDsWB3sbP6lTEnFnMzLMM6tx6N8=
|
||||
git.frostfs.info/TrueCloudLab/multinet v0.0.0-20241015075604-6cb0d80e0972/go.mod h1:2hM42MBrlhvN6XToaW6OWNk5ZLcu1FhaukGgxtfpDDI=
|
||||
git.frostfs.info/TrueCloudLab/rfc6979 v0.4.0 h1:M2KR3iBj7WpY3hP10IevfIB9MURr4O9mwVfJ+SjT3HA=
|
||||
git.frostfs.info/TrueCloudLab/rfc6979 v0.4.0/go.mod h1:okpbKfVYf/BpejtfFTfhZqFP+sZ8rsHrP8Rr/jYPNRc=
|
||||
git.frostfs.info/TrueCloudLab/tzhash v1.8.0 h1:UFMnUIk0Zh17m8rjGHJMqku2hCgaXDqjqZzS4gsb4UA=
|
||||
|
|
|
@ -79,4 +79,6 @@ const (
|
|||
ServerReconnectedSuccessfully = "server reconnected successfully"
|
||||
ServerReconnectFailed = "failed to reconnect server"
|
||||
WarnDuplicateAddress = "duplicate address"
|
||||
MultinetDialSuccess = "multinet dial successful"
|
||||
MultinetDialFail = "multinet dial failed"
|
||||
)
|
||||
|
|
68
internal/net/config.go
Normal file
68
internal/net/config.go
Normal file
|
@ -0,0 +1,68 @@
|
|||
package net
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"net/netip"
|
||||
"slices"
|
||||
"time"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/multinet"
|
||||
)
|
||||
|
||||
var errEmptySourceIPList = errors.New("empty source IP list")
|
||||
|
||||
type Subnet struct {
|
||||
Prefix string
|
||||
SourceIPs []string
|
||||
}
|
||||
|
||||
type Config struct {
|
||||
Enabled bool
|
||||
Subnets []Subnet
|
||||
Balancer string
|
||||
Restrict bool
|
||||
FallbackDelay time.Duration
|
||||
EventHandler multinet.EventHandler
|
||||
}
|
||||
|
||||
func (c Config) toMultinetConfig() (multinet.Config, error) {
|
||||
var subnets []multinet.Subnet
|
||||
for _, s := range c.Subnets {
|
||||
var ms multinet.Subnet
|
||||
p, err := netip.ParsePrefix(s.Prefix)
|
||||
if err != nil {
|
||||
return multinet.Config{}, fmt.Errorf("parse IP prefix '%s': %w", s.Prefix, err)
|
||||
}
|
||||
ms.Prefix = p
|
||||
for _, ip := range s.SourceIPs {
|
||||
addr, err := netip.ParseAddr(ip)
|
||||
if err != nil {
|
||||
return multinet.Config{}, fmt.Errorf("parse IP address '%s': %w", ip, err)
|
||||
}
|
||||
ms.SourceIPs = append(ms.SourceIPs, addr)
|
||||
}
|
||||
if len(ms.SourceIPs) == 0 {
|
||||
return multinet.Config{}, errEmptySourceIPList
|
||||
}
|
||||
subnets = append(subnets, ms)
|
||||
}
|
||||
return multinet.Config{
|
||||
Subnets: subnets,
|
||||
Balancer: multinet.BalancerType(c.Balancer),
|
||||
Restrict: c.Restrict,
|
||||
FallbackDelay: c.FallbackDelay,
|
||||
Dialer: newDefaultDialer(),
|
||||
EventHandler: c.EventHandler,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (c Config) equals(other Config) bool {
|
||||
return c.Enabled == other.Enabled &&
|
||||
slices.EqualFunc(c.Subnets, other.Subnets, func(lhs, rhs Subnet) bool {
|
||||
return lhs.Prefix == rhs.Prefix && slices.Equal(lhs.SourceIPs, rhs.SourceIPs)
|
||||
}) &&
|
||||
c.Balancer == other.Balancer &&
|
||||
c.Restrict == other.Restrict &&
|
||||
c.FallbackDelay == other.FallbackDelay
|
||||
}
|
54
internal/net/dial_target.go
Normal file
54
internal/net/dial_target.go
Normal file
|
@ -0,0 +1,54 @@
|
|||
// NOTE: code is taken from https://github.com/grpc/grpc-go/blob/v1.68.x/internal/transport/http_util.go
|
||||
|
||||
/*
|
||||
*
|
||||
* Copyright 2014 gRPC authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*
|
||||
*/
|
||||
|
||||
package net
|
||||
|
||||
import (
|
||||
"net/url"
|
||||
"strings"
|
||||
)
|
||||
|
||||
// parseDialTarget returns the network and address to pass to dialer.
|
||||
func parseDialTarget(target string) (string, string) {
|
||||
net := "tcp"
|
||||
m1 := strings.Index(target, ":")
|
||||
m2 := strings.Index(target, ":/")
|
||||
// handle unix:addr which will fail with url.Parse
|
||||
if m1 >= 0 && m2 < 0 {
|
||||
if n := target[0:m1]; n == "unix" {
|
||||
return n, target[m1+1:]
|
||||
}
|
||||
}
|
||||
if m2 >= 0 {
|
||||
t, err := url.Parse(target)
|
||||
if err != nil {
|
||||
return net, target
|
||||
}
|
||||
scheme := t.Scheme
|
||||
addr := t.Path
|
||||
if scheme == "unix" {
|
||||
if addr == "" {
|
||||
addr = t.Host
|
||||
}
|
||||
return scheme, addr
|
||||
}
|
||||
}
|
||||
return net, target
|
||||
}
|
36
internal/net/dialer.go
Normal file
36
internal/net/dialer.go
Normal file
|
@ -0,0 +1,36 @@
|
|||
package net
|
||||
|
||||
import (
|
||||
"net"
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
"golang.org/x/sys/unix"
|
||||
)
|
||||
|
||||
func newDefaultDialer() net.Dialer {
|
||||
// From `grpc.WithContextDialer` comment:
|
||||
//
|
||||
// Note: All supported releases of Go (as of December 2023) override the OS
|
||||
// defaults for TCP keepalive time and interval to 15s. To enable TCP keepalive
|
||||
// with OS defaults for keepalive time and interval, use a net.Dialer that sets
|
||||
// the KeepAlive field to a negative value, and sets the SO_KEEPALIVE socket
|
||||
// option to true from the Control field. For a concrete example of how to do
|
||||
// this, see internal.NetDialerWithTCPKeepalive().
|
||||
//
|
||||
// https://github.com/grpc/grpc-go/blob/830135e6c5a351abf75f0c9cfdf978e5df8daeba/dialoptions.go#L432
|
||||
//
|
||||
// From `internal.NetDialerWithTCPKeepalive` comment:
|
||||
//
|
||||
// TODO: Once https://github.com/golang/go/issues/62254 lands, and the
|
||||
// appropriate Go version becomes less than our least supported Go version, we
|
||||
// should look into using the new API to make things more straightforward.
|
||||
return net.Dialer{
|
||||
KeepAlive: time.Duration(-1),
|
||||
Control: func(_, _ string, c syscall.RawConn) error {
|
||||
return c.Control(func(fd uintptr) {
|
||||
_ = unix.SetsockoptInt(int(fd), unix.SOL_SOCKET, unix.SO_KEEPALIVE, 1)
|
||||
})
|
||||
},
|
||||
}
|
||||
}
|
69
internal/net/dialer_source.go
Normal file
69
internal/net/dialer_source.go
Normal file
|
@ -0,0 +1,69 @@
|
|||
package net
|
||||
|
||||
import (
|
||||
"context"
|
||||
"net"
|
||||
"sync"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/multinet"
|
||||
)
|
||||
|
||||
type DialerSource struct {
|
||||
guard sync.RWMutex
|
||||
|
||||
c Config
|
||||
|
||||
md multinet.Dialer
|
||||
}
|
||||
|
||||
func NewDialerSource(c Config) (*DialerSource, error) {
|
||||
result := &DialerSource{}
|
||||
if err := result.build(c); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func (s *DialerSource) build(c Config) error {
|
||||
if c.Enabled {
|
||||
mc, err := c.toMultinetConfig()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
md, err := multinet.NewDialer(mc)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
s.md = md
|
||||
s.c = c
|
||||
return nil
|
||||
}
|
||||
s.md = nil
|
||||
s.c = c
|
||||
return nil
|
||||
}
|
||||
|
||||
// GrpcContextDialer returns grpc.WithContextDialer func.
|
||||
// Returns nil if multinet disabled.
|
||||
func (s *DialerSource) GrpcContextDialer() func(context.Context, string) (net.Conn, error) {
|
||||
s.guard.RLock()
|
||||
defer s.guard.RUnlock()
|
||||
|
||||
if s.c.Enabled {
|
||||
return func(ctx context.Context, address string) (net.Conn, error) {
|
||||
network, address := parseDialTarget(address)
|
||||
return s.md.DialContext(ctx, network, address)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *DialerSource) Update(c Config) error {
|
||||
s.guard.Lock()
|
||||
defer s.guard.Unlock()
|
||||
|
||||
if s.c.equals(c) {
|
||||
return nil
|
||||
}
|
||||
return s.build(c)
|
||||
}
|
28
internal/net/event_handler.go
Normal file
28
internal/net/event_handler.go
Normal file
|
@ -0,0 +1,28 @@
|
|||
package net
|
||||
|
||||
import (
|
||||
"net"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/logs"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
type LogEventHandler struct {
|
||||
logger *zap.Logger
|
||||
}
|
||||
|
||||
func (l LogEventHandler) DialPerformed(sourceIP net.Addr, _, address string, err error) {
|
||||
sourceIPString := "undefined"
|
||||
if sourceIP != nil {
|
||||
sourceIPString = sourceIP.Network() + "://" + sourceIP.String()
|
||||
}
|
||||
if err == nil {
|
||||
l.logger.Debug(logs.MultinetDialSuccess, zap.String("source", sourceIPString), zap.String("destination", address))
|
||||
} else {
|
||||
l.logger.Debug(logs.MultinetDialFail, zap.String("source", sourceIPString), zap.String("destination", address), zap.Error(err))
|
||||
}
|
||||
}
|
||||
|
||||
func NewLogEventHandler(logger *zap.Logger) LogEventHandler {
|
||||
return LogEventHandler{logger: logger}
|
||||
}
|
Loading…
Reference in a new issue