[#521] Add internal/net package with multinet dialer source
Code is taken from frostfs-node#1422 Author: Dmitrii Stepanov (dstepanov-yadro) Signed-off-by: Alex Vanin <a.vanin@yadro.com>
This commit is contained in:
parent
e35b582fe2
commit
b7a5f91da5
6 changed files with 237 additions and 1 deletions
3
go.mod
3
go.mod
|
@ -7,6 +7,7 @@ require (
|
|||
git.frostfs.info/TrueCloudLab/frostfs-contract v0.19.3-0.20240621131249-49e5270f673e
|
||||
git.frostfs.info/TrueCloudLab/frostfs-observability v0.0.0-20240909114314-666d326cc573
|
||||
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20240924152932-1b67ab960848
|
||||
git.frostfs.info/TrueCloudLab/multinet v0.0.0-20241015075604-6cb0d80e0972
|
||||
git.frostfs.info/TrueCloudLab/policy-engine v0.0.0-20240822104152-a3bc3099bd5b
|
||||
git.frostfs.info/TrueCloudLab/zapjournald v0.0.0-20240124114243-cb2e66427d02
|
||||
github.com/aws/aws-sdk-go v1.44.6
|
||||
|
@ -34,6 +35,7 @@ require (
|
|||
golang.org/x/crypto v0.24.0
|
||||
golang.org/x/exp v0.0.0-20240506185415-9bf2ced13842
|
||||
golang.org/x/net v0.26.0
|
||||
golang.org/x/sys v0.22.0
|
||||
golang.org/x/text v0.16.0
|
||||
google.golang.org/grpc v1.66.2
|
||||
google.golang.org/protobuf v1.34.2
|
||||
|
@ -92,7 +94,6 @@ require (
|
|||
go.opentelemetry.io/proto/otlp v1.3.1 // indirect
|
||||
go.uber.org/multierr v1.11.0 // indirect
|
||||
golang.org/x/sync v0.7.0 // indirect
|
||||
golang.org/x/sys v0.22.0 // indirect
|
||||
golang.org/x/term v0.21.0 // indirect
|
||||
google.golang.org/genproto/googleapis/api v0.0.0-20240701130421-f6361c86f094 // indirect
|
||||
google.golang.org/genproto/googleapis/rpc v0.0.0-20240701130421-f6361c86f094 // indirect
|
||||
|
|
2
go.sum
2
go.sum
|
@ -48,6 +48,8 @@ git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20240924152932-1b67ab960848
|
|||
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20240924152932-1b67ab960848/go.mod h1:E4aAVE5GrOzQxpAjkS472CQXpwRjAmrkO1a9sxy/x7Y=
|
||||
git.frostfs.info/TrueCloudLab/hrw v1.2.1 h1:ccBRK21rFvY5R1WotI6LNoPlizk7qSvdfD8lNIRudVc=
|
||||
git.frostfs.info/TrueCloudLab/hrw v1.2.1/go.mod h1:C1Ygde2n843yTZEQ0FP69jYiuaYV0kriLvP4zm8JuvM=
|
||||
git.frostfs.info/TrueCloudLab/multinet v0.0.0-20241015075604-6cb0d80e0972 h1:/960fWeyn2AFHwQUwDsWB3sbP6lTEnFnMzLMM6tx6N8=
|
||||
git.frostfs.info/TrueCloudLab/multinet v0.0.0-20241015075604-6cb0d80e0972/go.mod h1:2hM42MBrlhvN6XToaW6OWNk5ZLcu1FhaukGgxtfpDDI=
|
||||
git.frostfs.info/TrueCloudLab/policy-engine v0.0.0-20240822104152-a3bc3099bd5b h1:M50kdfrf/h8c3cz0bJ2AEUcbXvAlPFVC1Wp1WkfZ/8E=
|
||||
git.frostfs.info/TrueCloudLab/policy-engine v0.0.0-20240822104152-a3bc3099bd5b/go.mod h1:GZTk55RI4dKzsK6BCn5h2xxE28UHNfgoq/NJxW/LQ6A=
|
||||
git.frostfs.info/TrueCloudLab/rfc6979 v0.4.0 h1:M2KR3iBj7WpY3hP10IevfIB9MURr4O9mwVfJ+SjT3HA=
|
||||
|
|
66
internal/net/config.go
Normal file
66
internal/net/config.go
Normal file
|
@ -0,0 +1,66 @@
|
|||
package net
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"net/netip"
|
||||
"slices"
|
||||
"time"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/multinet"
|
||||
)
|
||||
|
||||
var errEmptySourceIPList = errors.New("empty source IP list")
|
||||
|
||||
type Subnet struct {
|
||||
Prefix string
|
||||
SourceIPs []string
|
||||
}
|
||||
|
||||
type Config struct {
|
||||
Enabled bool
|
||||
Subnets []Subnet
|
||||
Balancer string
|
||||
Restrict bool
|
||||
FallbackDelay time.Duration
|
||||
}
|
||||
|
||||
func (c Config) toMultinetConfig() (multinet.Config, error) {
|
||||
var subnets []multinet.Subnet
|
||||
for _, s := range c.Subnets {
|
||||
var ms multinet.Subnet
|
||||
p, err := netip.ParsePrefix(s.Prefix)
|
||||
if err != nil {
|
||||
return multinet.Config{}, fmt.Errorf("parse IP prefix '%s': %w", s.Prefix, err)
|
||||
}
|
||||
ms.Prefix = p
|
||||
for _, ip := range s.SourceIPs {
|
||||
addr, err := netip.ParseAddr(ip)
|
||||
if err != nil {
|
||||
return multinet.Config{}, fmt.Errorf("parse IP address '%s': %w", ip, err)
|
||||
}
|
||||
ms.SourceIPs = append(ms.SourceIPs, addr)
|
||||
}
|
||||
if len(ms.SourceIPs) == 0 {
|
||||
return multinet.Config{}, errEmptySourceIPList
|
||||
}
|
||||
subnets = append(subnets, ms)
|
||||
}
|
||||
return multinet.Config{
|
||||
Subnets: subnets,
|
||||
Balancer: multinet.BalancerType(c.Balancer),
|
||||
Restrict: c.Restrict,
|
||||
FallbackDelay: c.FallbackDelay,
|
||||
Dialer: newDefaulDialer(),
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (c Config) equals(other Config) bool {
|
||||
return c.Enabled == other.Enabled &&
|
||||
slices.EqualFunc(c.Subnets, other.Subnets, func(lhs, rhs Subnet) bool {
|
||||
return lhs.Prefix == rhs.Prefix && slices.Equal(lhs.SourceIPs, rhs.SourceIPs)
|
||||
}) &&
|
||||
c.Balancer == other.Balancer &&
|
||||
c.Restrict == other.Restrict &&
|
||||
c.FallbackDelay == other.FallbackDelay
|
||||
}
|
54
internal/net/dial_target.go
Normal file
54
internal/net/dial_target.go
Normal file
|
@ -0,0 +1,54 @@
|
|||
// NOTE: code is taken from https://github.com/grpc/grpc-go/blob/v1.68.x/internal/transport/http_util.go
|
||||
|
||||
/*
|
||||
*
|
||||
* Copyright 2014 gRPC authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*
|
||||
*/
|
||||
|
||||
package net
|
||||
|
||||
import (
|
||||
"net/url"
|
||||
"strings"
|
||||
)
|
||||
|
||||
// parseDialTarget returns the network and address to pass to dialer.
|
||||
func parseDialTarget(target string) (string, string) {
|
||||
net := "tcp"
|
||||
m1 := strings.Index(target, ":")
|
||||
m2 := strings.Index(target, ":/")
|
||||
// handle unix:addr which will fail with url.Parse
|
||||
if m1 >= 0 && m2 < 0 {
|
||||
if n := target[0:m1]; n == "unix" {
|
||||
return n, target[m1+1:]
|
||||
}
|
||||
}
|
||||
if m2 >= 0 {
|
||||
t, err := url.Parse(target)
|
||||
if err != nil {
|
||||
return net, target
|
||||
}
|
||||
scheme := t.Scheme
|
||||
addr := t.Path
|
||||
if scheme == "unix" {
|
||||
if addr == "" {
|
||||
addr = t.Host
|
||||
}
|
||||
return scheme, addr
|
||||
}
|
||||
}
|
||||
return net, target
|
||||
}
|
30
internal/net/dialer.go
Normal file
30
internal/net/dialer.go
Normal file
|
@ -0,0 +1,30 @@
|
|||
package net
|
||||
|
||||
import (
|
||||
"net"
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
"golang.org/x/sys/unix"
|
||||
)
|
||||
|
||||
func newDefaulDialer() net.Dialer {
|
||||
// From `grpc.WithContextDialer` comment:
|
||||
//
|
||||
// Note: All supported releases of Go (as of December 2023) override the OS
|
||||
// defaults for TCP keepalive time and interval to 15s. To enable TCP keepalive
|
||||
// with OS defaults for keepalive time and interval, use a net.Dialer that sets
|
||||
// the KeepAlive field to a negative value, and sets the SO_KEEPALIVE socket
|
||||
// option to true from the Control field. For a concrete example of how to do
|
||||
// this, see internal.NetDialerWithTCPKeepalive().
|
||||
//
|
||||
// https://github.com/grpc/grpc-go/blob/830135e6c5a351abf75f0c9cfdf978e5df8daeba/dialoptions.go#L432
|
||||
return net.Dialer{
|
||||
KeepAlive: time.Duration(-1),
|
||||
Control: func(_, _ string, c syscall.RawConn) error {
|
||||
return c.Control(func(fd uintptr) {
|
||||
_ = unix.SetsockoptInt(int(fd), unix.SOL_SOCKET, unix.SO_KEEPALIVE, 1)
|
||||
})
|
||||
},
|
||||
}
|
||||
}
|
83
internal/net/dialer_source.go
Normal file
83
internal/net/dialer_source.go
Normal file
|
@ -0,0 +1,83 @@
|
|||
package net
|
||||
|
||||
import (
|
||||
"context"
|
||||
"net"
|
||||
"sync"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/multinet"
|
||||
)
|
||||
|
||||
type DialerSource struct {
|
||||
guard sync.RWMutex
|
||||
|
||||
c Config
|
||||
|
||||
md multinet.Dialer
|
||||
}
|
||||
|
||||
func NewDialerSource(c Config) (*DialerSource, error) {
|
||||
result := &DialerSource{}
|
||||
if err := result.build(c); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func (s *DialerSource) build(c Config) error {
|
||||
if c.Enabled {
|
||||
mc, err := c.toMultinetConfig()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
md, err := multinet.NewDialer(mc)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
s.md = md
|
||||
s.c = c
|
||||
return nil
|
||||
}
|
||||
s.md = nil
|
||||
s.c = c
|
||||
return nil
|
||||
}
|
||||
|
||||
// GrpcContextDialer returns grpc.WithContextDialer func.
|
||||
// Returns nil if multinet disabled.
|
||||
func (s *DialerSource) GrpcContextDialer() func(context.Context, string) (net.Conn, error) {
|
||||
s.guard.RLock()
|
||||
defer s.guard.RUnlock()
|
||||
|
||||
if s.c.Enabled {
|
||||
return func(ctx context.Context, address string) (net.Conn, error) {
|
||||
network, address := parseDialTarget(address)
|
||||
return s.md.DialContext(ctx, network, address)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// NetContextDialer returns net.DialContext dial function.
|
||||
// Returns nil if multinet disabled.
|
||||
func (s *DialerSource) NetContextDialer() func(context.Context, string, string) (net.Conn, error) {
|
||||
s.guard.RLock()
|
||||
defer s.guard.RUnlock()
|
||||
|
||||
if s.c.Enabled {
|
||||
return func(ctx context.Context, network, address string) (net.Conn, error) {
|
||||
return s.md.DialContext(ctx, network, address)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *DialerSource) Update(c Config) error {
|
||||
s.guard.Lock()
|
||||
defer s.guard.Unlock()
|
||||
|
||||
if s.c.equals(c) {
|
||||
return nil
|
||||
}
|
||||
return s.build(c)
|
||||
}
|
Loading…
Reference in a new issue