[#482] Fix containers resolving
Signed-off-by: Marina Biryukova <m.biryukova@yadro.com>
This commit is contained in:
parent
056f168d77
commit
6259caccb8
6 changed files with 38 additions and 51 deletions
|
@ -167,7 +167,7 @@ func prepareHandlerContextBase(cacheCfg *layer.CachesConfig) (*handlerContextBas
|
||||||
tp := layer.NewTestFrostFS(key)
|
tp := layer.NewTestFrostFS(key)
|
||||||
|
|
||||||
testResolver := &resolver.Resolver{Name: "test_resolver"}
|
testResolver := &resolver.Resolver{Name: "test_resolver"}
|
||||||
testResolver.SetResolveFunc(func(_ context.Context, name string) (cid.ID, error) {
|
testResolver.SetResolveFunc(func(_ context.Context, _, name string) (cid.ID, error) {
|
||||||
return tp.ContainerID(name)
|
return tp.ContainerID(name)
|
||||||
})
|
})
|
||||||
|
|
||||||
|
|
|
@ -64,7 +64,7 @@ func (n *Layer) containerInfo(ctx context.Context, prm PrmContainer) (*data.Buck
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
zone, _ := n.features.FormContainerZone(reqInfo.Namespace)
|
zone := n.features.FormContainerZone(reqInfo.Namespace)
|
||||||
if zone != info.Zone {
|
if zone != info.Zone {
|
||||||
return nil, fmt.Errorf("ns '%s' and zone '%s' are mismatched for container '%s'", zone, info.Zone, prm.ContainerID)
|
return nil, fmt.Errorf("ns '%s' and zone '%s' are mismatched for container '%s'", zone, info.Zone, prm.ContainerID)
|
||||||
}
|
}
|
||||||
|
@ -111,7 +111,7 @@ func (n *Layer) createContainer(ctx context.Context, p *CreateBucketParams) (*da
|
||||||
p.LocationConstraint = api.DefaultLocationConstraint // s3tests_boto3.functional.test_s3:test_bucket_get_location
|
p.LocationConstraint = api.DefaultLocationConstraint // s3tests_boto3.functional.test_s3:test_bucket_get_location
|
||||||
}
|
}
|
||||||
|
|
||||||
zone, _ := n.features.FormContainerZone(p.Namespace)
|
zone := n.features.FormContainerZone(p.Namespace)
|
||||||
|
|
||||||
bktInfo := &data.BucketInfo{
|
bktInfo := &data.BucketInfo{
|
||||||
Name: p.Name,
|
Name: p.Name,
|
||||||
|
|
|
@ -52,12 +52,12 @@ func (k *FeatureSettingsMock) SetMD5Enabled(md5Enabled bool) {
|
||||||
k.md5Enabled = md5Enabled
|
k.md5Enabled = md5Enabled
|
||||||
}
|
}
|
||||||
|
|
||||||
func (k *FeatureSettingsMock) FormContainerZone(ns string) (zone string, isDefault bool) {
|
func (k *FeatureSettingsMock) FormContainerZone(ns string) string {
|
||||||
if ns == "" {
|
if ns == "" {
|
||||||
return v2container.SysAttributeZoneDefault, true
|
return v2container.SysAttributeZoneDefault
|
||||||
}
|
}
|
||||||
|
|
||||||
return ns + ".ns", false
|
return ns + ".ns"
|
||||||
}
|
}
|
||||||
|
|
||||||
type TestFrostFS struct {
|
type TestFrostFS struct {
|
||||||
|
|
|
@ -35,14 +35,14 @@ import (
|
||||||
|
|
||||||
type (
|
type (
|
||||||
BucketResolver interface {
|
BucketResolver interface {
|
||||||
Resolve(ctx context.Context, name string) (cid.ID, error)
|
Resolve(ctx context.Context, zone, name string) (cid.ID, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
FeatureSettings interface {
|
FeatureSettings interface {
|
||||||
ClientCut() bool
|
ClientCut() bool
|
||||||
BufferMaxSizeForPut() uint64
|
BufferMaxSizeForPut() uint64
|
||||||
MD5Enabled() bool
|
MD5Enabled() bool
|
||||||
FormContainerZone(ns string) (zone string, isDefault bool)
|
FormContainerZone(ns string) string
|
||||||
}
|
}
|
||||||
|
|
||||||
Layer struct {
|
Layer struct {
|
||||||
|
@ -322,13 +322,13 @@ func (n *Layer) GetBucketInfo(ctx context.Context, name string) (*data.BucketInf
|
||||||
}
|
}
|
||||||
|
|
||||||
reqInfo := middleware.GetReqInfo(ctx)
|
reqInfo := middleware.GetReqInfo(ctx)
|
||||||
zone, _ := n.features.FormContainerZone(reqInfo.Namespace)
|
zone := n.features.FormContainerZone(reqInfo.Namespace)
|
||||||
|
|
||||||
if bktInfo := n.cache.GetBucket(zone, name); bktInfo != nil {
|
if bktInfo := n.cache.GetBucket(zone, name); bktInfo != nil {
|
||||||
return bktInfo, nil
|
return bktInfo, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
containerID, err := n.ResolveBucket(ctx, name)
|
containerID, err := n.ResolveBucket(ctx, zone, name)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if strings.Contains(err.Error(), "not found") {
|
if strings.Contains(err.Error(), "not found") {
|
||||||
return nil, fmt.Errorf("%w: %s", errors.GetAPIError(errors.ErrNoSuchBucket), err.Error())
|
return nil, fmt.Errorf("%w: %s", errors.GetAPIError(errors.ErrNoSuchBucket), err.Error())
|
||||||
|
@ -352,13 +352,13 @@ func (n *Layer) ResolveCID(ctx context.Context, name string) (cid.ID, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
reqInfo := middleware.GetReqInfo(ctx)
|
reqInfo := middleware.GetReqInfo(ctx)
|
||||||
zone, _ := n.features.FormContainerZone(reqInfo.Namespace)
|
zone := n.features.FormContainerZone(reqInfo.Namespace)
|
||||||
|
|
||||||
if bktInfo := n.cache.GetBucket(zone, name); bktInfo != nil {
|
if bktInfo := n.cache.GetBucket(zone, name); bktInfo != nil {
|
||||||
return bktInfo.CID, nil
|
return bktInfo.CID, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
return n.ResolveBucket(ctx, name)
|
return n.ResolveBucket(ctx, zone, name)
|
||||||
}
|
}
|
||||||
|
|
||||||
// ListBuckets returns all user containers. The name of the bucket is a container
|
// ListBuckets returns all user containers. The name of the bucket is a container
|
||||||
|
@ -798,10 +798,10 @@ func (n *Layer) CreateBucket(ctx context.Context, p *CreateBucketParams) (*data.
|
||||||
return nil, errors.GetAPIError(errors.ErrBucketAlreadyExists)
|
return nil, errors.GetAPIError(errors.ErrBucketAlreadyExists)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (n *Layer) ResolveBucket(ctx context.Context, name string) (cid.ID, error) {
|
func (n *Layer) ResolveBucket(ctx context.Context, zone, name string) (cid.ID, error) {
|
||||||
var cnrID cid.ID
|
var cnrID cid.ID
|
||||||
if err := cnrID.DecodeString(name); err != nil {
|
if err := cnrID.DecodeString(name); err != nil {
|
||||||
if cnrID, err = n.resolver.Resolve(ctx, name); err != nil {
|
if cnrID, err = n.resolver.Resolve(ctx, zone, name); err != nil {
|
||||||
return cid.ID{}, err
|
return cid.ID{}, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -6,7 +6,7 @@ import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/middleware"
|
v2container "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/container"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
|
||||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/ns"
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/ns"
|
||||||
|
@ -29,20 +29,14 @@ type FrostFS interface {
|
||||||
SystemDNS(context.Context) (string, error)
|
SystemDNS(context.Context) (string, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
type Settings interface {
|
|
||||||
FormContainerZone(ns string) (zone string, isDefault bool)
|
|
||||||
}
|
|
||||||
|
|
||||||
type Config struct {
|
type Config struct {
|
||||||
FrostFS FrostFS
|
FrostFS FrostFS
|
||||||
RPCAddress string
|
RPCAddress string
|
||||||
Settings Settings
|
|
||||||
}
|
}
|
||||||
|
|
||||||
type BucketResolver struct {
|
type BucketResolver struct {
|
||||||
rpcAddress string
|
rpcAddress string
|
||||||
frostfs FrostFS
|
frostfs FrostFS
|
||||||
settings Settings
|
|
||||||
|
|
||||||
mu sync.RWMutex
|
mu sync.RWMutex
|
||||||
resolvers []*Resolver
|
resolvers []*Resolver
|
||||||
|
@ -50,15 +44,15 @@ type BucketResolver struct {
|
||||||
|
|
||||||
type Resolver struct {
|
type Resolver struct {
|
||||||
Name string
|
Name string
|
||||||
resolve func(context.Context, string) (cid.ID, error)
|
resolve func(context.Context, string, string) (cid.ID, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *Resolver) SetResolveFunc(fn func(context.Context, string) (cid.ID, error)) {
|
func (r *Resolver) SetResolveFunc(fn func(context.Context, string, string) (cid.ID, error)) {
|
||||||
r.resolve = fn
|
r.resolve = fn
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *Resolver) Resolve(ctx context.Context, name string) (cid.ID, error) {
|
func (r *Resolver) Resolve(ctx context.Context, zone, name string) (cid.ID, error) {
|
||||||
return r.resolve(ctx, name)
|
return r.resolve(ctx, zone, name)
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewBucketResolver(resolverNames []string, cfg *Config) (*BucketResolver, error) {
|
func NewBucketResolver(resolverNames []string, cfg *Config) (*BucketResolver, error) {
|
||||||
|
@ -87,12 +81,12 @@ func createResolvers(resolverNames []string, cfg *Config) ([]*Resolver, error) {
|
||||||
return resolvers, nil
|
return resolvers, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *BucketResolver) Resolve(ctx context.Context, bktName string) (cnrID cid.ID, err error) {
|
func (r *BucketResolver) Resolve(ctx context.Context, zone, bktName string) (cnrID cid.ID, err error) {
|
||||||
r.mu.RLock()
|
r.mu.RLock()
|
||||||
defer r.mu.RUnlock()
|
defer r.mu.RUnlock()
|
||||||
|
|
||||||
for _, resolver := range r.resolvers {
|
for _, resolver := range r.resolvers {
|
||||||
cnrID, resolverErr := resolver.Resolve(ctx, bktName)
|
cnrID, resolverErr := resolver.Resolve(ctx, zone, bktName)
|
||||||
if resolverErr != nil {
|
if resolverErr != nil {
|
||||||
resolverErr = fmt.Errorf("%s: %w", resolver.Name, resolverErr)
|
resolverErr = fmt.Errorf("%s: %w", resolver.Name, resolverErr)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
|
@ -123,7 +117,6 @@ func (r *BucketResolver) UpdateResolvers(resolverNames []string) error {
|
||||||
cfg := &Config{
|
cfg := &Config{
|
||||||
FrostFS: r.frostfs,
|
FrostFS: r.frostfs,
|
||||||
RPCAddress: r.rpcAddress,
|
RPCAddress: r.rpcAddress,
|
||||||
Settings: r.settings,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
resolvers, err := createResolvers(resolverNames, cfg)
|
resolvers, err := createResolvers(resolverNames, cfg)
|
||||||
|
@ -152,30 +145,25 @@ func (r *BucketResolver) equals(resolverNames []string) bool {
|
||||||
func newResolver(name string, cfg *Config) (*Resolver, error) {
|
func newResolver(name string, cfg *Config) (*Resolver, error) {
|
||||||
switch name {
|
switch name {
|
||||||
case DNSResolver:
|
case DNSResolver:
|
||||||
return NewDNSResolver(cfg.FrostFS, cfg.Settings)
|
return NewDNSResolver(cfg.FrostFS)
|
||||||
case NNSResolver:
|
case NNSResolver:
|
||||||
return NewNNSResolver(cfg.RPCAddress, cfg.Settings)
|
return NewNNSResolver(cfg.RPCAddress)
|
||||||
default:
|
default:
|
||||||
return nil, fmt.Errorf("unknown resolver: %s", name)
|
return nil, fmt.Errorf("unknown resolver: %s", name)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewDNSResolver(frostFS FrostFS, settings Settings) (*Resolver, error) {
|
func NewDNSResolver(frostFS FrostFS) (*Resolver, error) {
|
||||||
if frostFS == nil {
|
if frostFS == nil {
|
||||||
return nil, fmt.Errorf("pool must not be nil for DNS resolver")
|
return nil, fmt.Errorf("pool must not be nil for DNS resolver")
|
||||||
}
|
}
|
||||||
if settings == nil {
|
|
||||||
return nil, fmt.Errorf("resolver settings must not be nil for DNS resolver")
|
|
||||||
}
|
|
||||||
|
|
||||||
var dns ns.DNS
|
var dns ns.DNS
|
||||||
|
|
||||||
resolveFunc := func(ctx context.Context, name string) (cid.ID, error) {
|
resolveFunc := func(ctx context.Context, zone, name string) (cid.ID, error) {
|
||||||
var err error
|
var err error
|
||||||
reqInfo := middleware.GetReqInfo(ctx)
|
|
||||||
|
|
||||||
zone, isDefault := settings.FormContainerZone(reqInfo.Namespace)
|
if zone == v2container.SysAttributeZoneDefault {
|
||||||
if isDefault {
|
|
||||||
zone, err = frostFS.SystemDNS(ctx)
|
zone, err = frostFS.SystemDNS(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return cid.ID{}, fmt.Errorf("read system DNS parameter of the FrostFS: %w", err)
|
return cid.ID{}, fmt.Errorf("read system DNS parameter of the FrostFS: %w", err)
|
||||||
|
@ -196,13 +184,10 @@ func NewDNSResolver(frostFS FrostFS, settings Settings) (*Resolver, error) {
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewNNSResolver(address string, settings Settings) (*Resolver, error) {
|
func NewNNSResolver(address string) (*Resolver, error) {
|
||||||
if address == "" {
|
if address == "" {
|
||||||
return nil, fmt.Errorf("rpc address must not be empty for NNS resolver")
|
return nil, fmt.Errorf("rpc address must not be empty for NNS resolver")
|
||||||
}
|
}
|
||||||
if settings == nil {
|
|
||||||
return nil, fmt.Errorf("resolver settings must not be nil for NNS resolver")
|
|
||||||
}
|
|
||||||
|
|
||||||
var nns ns.NNS
|
var nns ns.NNS
|
||||||
|
|
||||||
|
@ -210,12 +195,9 @@ func NewNNSResolver(address string, settings Settings) (*Resolver, error) {
|
||||||
return nil, fmt.Errorf("dial %s: %w", address, err)
|
return nil, fmt.Errorf("dial %s: %w", address, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
resolveFunc := func(ctx context.Context, name string) (cid.ID, error) {
|
resolveFunc := func(_ context.Context, zone, name string) (cid.ID, error) {
|
||||||
var d container.Domain
|
var d container.Domain
|
||||||
d.SetName(name)
|
d.SetName(name)
|
||||||
|
|
||||||
reqInfo := middleware.GetReqInfo(ctx)
|
|
||||||
zone, _ := settings.FormContainerZone(reqInfo.Namespace)
|
|
||||||
d.SetZone(zone)
|
d.SetZone(zone)
|
||||||
|
|
||||||
cnrID, err := nns.ResolveContainerDomain(d)
|
cnrID, err := nns.ResolveContainerDomain(d)
|
||||||
|
|
|
@ -11,6 +11,7 @@ import (
|
||||||
"os"
|
"os"
|
||||||
"os/signal"
|
"os/signal"
|
||||||
"runtime/debug"
|
"runtime/debug"
|
||||||
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
"syscall"
|
"syscall"
|
||||||
"time"
|
"time"
|
||||||
|
@ -404,12 +405,12 @@ func (s *appSettings) NamespaceHeader() string {
|
||||||
return s.namespaceHeader
|
return s.namespaceHeader
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *appSettings) FormContainerZone(ns string) (zone string, isDefault bool) {
|
func (s *appSettings) FormContainerZone(ns string) string {
|
||||||
if len(ns) == 0 {
|
if len(ns) == 0 {
|
||||||
return v2container.SysAttributeZoneDefault, true
|
return v2container.SysAttributeZoneDefault
|
||||||
}
|
}
|
||||||
|
|
||||||
return ns + ".ns", false
|
return ns + ".ns"
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *appSettings) isDefaultNamespace(ns string) bool {
|
func (s *appSettings) isDefaultNamespace(ns string) bool {
|
||||||
|
@ -525,7 +526,6 @@ func (a *App) getResolverConfig() *resolver.Config {
|
||||||
return &resolver.Config{
|
return &resolver.Config{
|
||||||
FrostFS: frostfs.NewResolverFrostFS(a.pool),
|
FrostFS: frostfs.NewResolverFrostFS(a.pool),
|
||||||
RPCAddress: a.cfg.GetString(cfgRPCEndpoint),
|
RPCAddress: a.cfg.GetString(cfgRPCEndpoint),
|
||||||
Settings: a.settings,
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1062,7 +1062,12 @@ func (a *App) fetchContainerInfo(ctx context.Context, cfgKey string) (info *data
|
||||||
|
|
||||||
var id cid.ID
|
var id cid.ID
|
||||||
if err = id.DecodeString(containerString); err != nil {
|
if err = id.DecodeString(containerString); err != nil {
|
||||||
if id, err = a.bucketResolver.Resolve(ctx, containerString); err != nil {
|
parts := strings.Split(containerString, ".")
|
||||||
|
if len(parts) != 2 {
|
||||||
|
return nil, fmt.Errorf("invalid container address: %s", containerString)
|
||||||
|
}
|
||||||
|
|
||||||
|
if id, err = a.bucketResolver.Resolve(ctx, parts[1], parts[0]); err != nil {
|
||||||
return nil, fmt.Errorf("resolve container name %s: %w", containerString, err)
|
return nil, fmt.Errorf("resolve container name %s: %w", containerString, err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue