From 69b7761bd6e03cad3f70bffe592716ac8bb52489 Mon Sep 17 00:00:00 2001 From: Alex Vanin Date: Tue, 29 Oct 2024 17:45:42 +0300 Subject: [PATCH] [#160] Add internal/net package with multinet dialer source Signed-off-by: Alex Vanin --- go.mod | 3 +- go.sum | 2 + internal/logs/logs.go | 2 + internal/net/config.go | 68 ++++++++++++++++++++++++++++++++++ internal/net/dial_target.go | 54 +++++++++++++++++++++++++++ internal/net/dialer.go | 36 ++++++++++++++++++ internal/net/dialer_source.go | 69 +++++++++++++++++++++++++++++++++++ internal/net/event_handler.go | 28 ++++++++++++++ 8 files changed, 261 insertions(+), 1 deletion(-) create mode 100644 internal/net/config.go create mode 100644 internal/net/dial_target.go create mode 100644 internal/net/dialer.go create mode 100644 internal/net/dialer_source.go create mode 100644 internal/net/event_handler.go diff --git a/go.mod b/go.mod index d1a3788..efb79eb 100644 --- a/go.mod +++ b/go.mod @@ -6,6 +6,7 @@ require ( git.frostfs.info/TrueCloudLab/frostfs-api-go/v2 v2.16.1-0.20240916093537-13fa0da3741e git.frostfs.info/TrueCloudLab/frostfs-observability v0.0.0-20240909114314-666d326cc573 git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20240918095938-e580ee991d98 + git.frostfs.info/TrueCloudLab/multinet v0.0.0-20241015075604-6cb0d80e0972 git.frostfs.info/TrueCloudLab/zapjournald v0.0.0-20240124114243-cb2e66427d02 github.com/bluele/gcache v0.0.2 github.com/docker/go-units v0.4.0 @@ -25,6 +26,7 @@ require ( go.uber.org/zap v1.27.0 golang.org/x/exp v0.0.0-20240222234643-814bf88cf225 golang.org/x/net v0.26.0 + golang.org/x/sys v0.22.0 google.golang.org/grpc v1.66.2 ) @@ -106,7 +108,6 @@ require ( go.uber.org/multierr v1.11.0 // indirect golang.org/x/crypto v0.24.0 // indirect golang.org/x/sync v0.7.0 // indirect - golang.org/x/sys v0.22.0 // indirect golang.org/x/term v0.21.0 // indirect golang.org/x/text v0.16.0 // indirect golang.org/x/time v0.3.0 // indirect diff --git a/go.sum b/go.sum index bc433eb..ce02ee6 100644 --- a/go.sum +++ b/go.sum @@ -49,6 +49,8 @@ git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20240918095938-e580ee991d98 git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20240918095938-e580ee991d98/go.mod h1:GeNpo12HcEW4J412sH5yf8xFYapxlrt5fcYzRwg0Ino= git.frostfs.info/TrueCloudLab/hrw v1.2.1 h1:ccBRK21rFvY5R1WotI6LNoPlizk7qSvdfD8lNIRudVc= git.frostfs.info/TrueCloudLab/hrw v1.2.1/go.mod h1:C1Ygde2n843yTZEQ0FP69jYiuaYV0kriLvP4zm8JuvM= +git.frostfs.info/TrueCloudLab/multinet v0.0.0-20241015075604-6cb0d80e0972 h1:/960fWeyn2AFHwQUwDsWB3sbP6lTEnFnMzLMM6tx6N8= +git.frostfs.info/TrueCloudLab/multinet v0.0.0-20241015075604-6cb0d80e0972/go.mod h1:2hM42MBrlhvN6XToaW6OWNk5ZLcu1FhaukGgxtfpDDI= git.frostfs.info/TrueCloudLab/rfc6979 v0.4.0 h1:M2KR3iBj7WpY3hP10IevfIB9MURr4O9mwVfJ+SjT3HA= git.frostfs.info/TrueCloudLab/rfc6979 v0.4.0/go.mod h1:okpbKfVYf/BpejtfFTfhZqFP+sZ8rsHrP8Rr/jYPNRc= git.frostfs.info/TrueCloudLab/tzhash v1.8.0 h1:UFMnUIk0Zh17m8rjGHJMqku2hCgaXDqjqZzS4gsb4UA= diff --git a/internal/logs/logs.go b/internal/logs/logs.go index 96bdaa5..7b7ddc1 100644 --- a/internal/logs/logs.go +++ b/internal/logs/logs.go @@ -79,4 +79,6 @@ const ( ServerReconnectedSuccessfully = "server reconnected successfully" ServerReconnectFailed = "failed to reconnect server" WarnDuplicateAddress = "duplicate address" + MultinetDialSuccess = "multinet dial successful" + MultinetDialFail = "multinet dial failed" ) diff --git a/internal/net/config.go b/internal/net/config.go new file mode 100644 index 0000000..b40e003 --- /dev/null +++ b/internal/net/config.go @@ -0,0 +1,68 @@ +package net + +import ( + "errors" + "fmt" + "net/netip" + "slices" + "time" + + "git.frostfs.info/TrueCloudLab/multinet" +) + +var errEmptySourceIPList = errors.New("empty source IP list") + +type Subnet struct { + Prefix string + SourceIPs []string +} + +type Config struct { + Enabled bool + Subnets []Subnet + Balancer string + Restrict bool + FallbackDelay time.Duration + EventHandler multinet.EventHandler +} + +func (c Config) toMultinetConfig() (multinet.Config, error) { + var subnets []multinet.Subnet + for _, s := range c.Subnets { + var ms multinet.Subnet + p, err := netip.ParsePrefix(s.Prefix) + if err != nil { + return multinet.Config{}, fmt.Errorf("parse IP prefix '%s': %w", s.Prefix, err) + } + ms.Prefix = p + for _, ip := range s.SourceIPs { + addr, err := netip.ParseAddr(ip) + if err != nil { + return multinet.Config{}, fmt.Errorf("parse IP address '%s': %w", ip, err) + } + ms.SourceIPs = append(ms.SourceIPs, addr) + } + if len(ms.SourceIPs) == 0 { + return multinet.Config{}, errEmptySourceIPList + } + subnets = append(subnets, ms) + } + return multinet.Config{ + Subnets: subnets, + Balancer: multinet.BalancerType(c.Balancer), + Restrict: c.Restrict, + FallbackDelay: c.FallbackDelay, + Dialer: newDefaultDialer(), + EventHandler: c.EventHandler, + }, nil +} + +func (c Config) equals(other Config) bool { + return c.Enabled == other.Enabled && + slices.EqualFunc(c.Subnets, other.Subnets, func(lhs, rhs Subnet) bool { + return lhs.Prefix == rhs.Prefix && slices.Equal(lhs.SourceIPs, rhs.SourceIPs) + }) && + c.Balancer == other.Balancer && + c.Restrict == other.Restrict && + c.FallbackDelay == other.FallbackDelay +} diff --git a/internal/net/dial_target.go b/internal/net/dial_target.go new file mode 100644 index 0000000..6265f18 --- /dev/null +++ b/internal/net/dial_target.go @@ -0,0 +1,54 @@ +// NOTE: code is taken from https://github.com/grpc/grpc-go/blob/v1.68.x/internal/transport/http_util.go + +/* + * + * Copyright 2014 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package net + +import ( + "net/url" + "strings" +) + +// parseDialTarget returns the network and address to pass to dialer. +func parseDialTarget(target string) (string, string) { + net := "tcp" + m1 := strings.Index(target, ":") + m2 := strings.Index(target, ":/") + // handle unix:addr which will fail with url.Parse + if m1 >= 0 && m2 < 0 { + if n := target[0:m1]; n == "unix" { + return n, target[m1+1:] + } + } + if m2 >= 0 { + t, err := url.Parse(target) + if err != nil { + return net, target + } + scheme := t.Scheme + addr := t.Path + if scheme == "unix" { + if addr == "" { + addr = t.Host + } + return scheme, addr + } + } + return net, target +} diff --git a/internal/net/dialer.go b/internal/net/dialer.go new file mode 100644 index 0000000..8441dd5 --- /dev/null +++ b/internal/net/dialer.go @@ -0,0 +1,36 @@ +package net + +import ( + "net" + "syscall" + "time" + + "golang.org/x/sys/unix" +) + +func newDefaultDialer() net.Dialer { + // From `grpc.WithContextDialer` comment: + // + // Note: All supported releases of Go (as of December 2023) override the OS + // defaults for TCP keepalive time and interval to 15s. To enable TCP keepalive + // with OS defaults for keepalive time and interval, use a net.Dialer that sets + // the KeepAlive field to a negative value, and sets the SO_KEEPALIVE socket + // option to true from the Control field. For a concrete example of how to do + // this, see internal.NetDialerWithTCPKeepalive(). + // + // https://github.com/grpc/grpc-go/blob/830135e6c5a351abf75f0c9cfdf978e5df8daeba/dialoptions.go#L432 + // + // From `internal.NetDialerWithTCPKeepalive` comment: + // + // TODO: Once https://github.com/golang/go/issues/62254 lands, and the + // appropriate Go version becomes less than our least supported Go version, we + // should look into using the new API to make things more straightforward. + return net.Dialer{ + KeepAlive: time.Duration(-1), + Control: func(_, _ string, c syscall.RawConn) error { + return c.Control(func(fd uintptr) { + _ = unix.SetsockoptInt(int(fd), unix.SOL_SOCKET, unix.SO_KEEPALIVE, 1) + }) + }, + } +} diff --git a/internal/net/dialer_source.go b/internal/net/dialer_source.go new file mode 100644 index 0000000..e6a142a --- /dev/null +++ b/internal/net/dialer_source.go @@ -0,0 +1,69 @@ +package net + +import ( + "context" + "net" + "sync" + + "git.frostfs.info/TrueCloudLab/multinet" +) + +type DialerSource struct { + guard sync.RWMutex + + c Config + + md multinet.Dialer +} + +func NewDialerSource(c Config) (*DialerSource, error) { + result := &DialerSource{} + if err := result.build(c); err != nil { + return nil, err + } + return result, nil +} + +func (s *DialerSource) build(c Config) error { + if c.Enabled { + mc, err := c.toMultinetConfig() + if err != nil { + return err + } + md, err := multinet.NewDialer(mc) + if err != nil { + return err + } + s.md = md + s.c = c + return nil + } + s.md = nil + s.c = c + return nil +} + +// GrpcContextDialer returns grpc.WithContextDialer func. +// Returns nil if multinet disabled. +func (s *DialerSource) GrpcContextDialer() func(context.Context, string) (net.Conn, error) { + s.guard.RLock() + defer s.guard.RUnlock() + + if s.c.Enabled { + return func(ctx context.Context, address string) (net.Conn, error) { + network, address := parseDialTarget(address) + return s.md.DialContext(ctx, network, address) + } + } + return nil +} + +func (s *DialerSource) Update(c Config) error { + s.guard.Lock() + defer s.guard.Unlock() + + if s.c.equals(c) { + return nil + } + return s.build(c) +} diff --git a/internal/net/event_handler.go b/internal/net/event_handler.go new file mode 100644 index 0000000..9520c01 --- /dev/null +++ b/internal/net/event_handler.go @@ -0,0 +1,28 @@ +package net + +import ( + "net" + + "git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/logs" + "go.uber.org/zap" +) + +type LogEventHandler struct { + logger *zap.Logger +} + +func (l LogEventHandler) DialPerformed(sourceIP net.Addr, _, address string, err error) { + sourceIPString := "undefined" + if sourceIP != nil { + sourceIPString = sourceIP.Network() + "://" + sourceIP.String() + } + if err == nil { + l.logger.Debug(logs.MultinetDialSuccess, zap.String("source", sourceIPString), zap.String("destination", address)) + } else { + l.logger.Debug(logs.MultinetDialFail, zap.String("source", sourceIPString), zap.String("destination", address), zap.Error(err)) + } +} + +func NewLogEventHandler(logger *zap.Logger) LogEventHandler { + return LogEventHandler{logger: logger} +}