From be95531bfdb4d82145dd12ac18a5e59559b9fad5 Mon Sep 17 00:00:00 2001 From: Aleksey Kravchenko Date: Wed, 22 Jan 2025 19:07:48 +0300 Subject: [PATCH] [#5] Add container zone names support. Signed-off-by: Aleksey Kravchenko --- backend/frostfs/frostfs.go | 151 ++++++++++++++++++++--------------- backend/frostfs/util.go | 21 ++++- backend/frostfs/util_test.go | 51 ++++++++++++ 3 files changed, 156 insertions(+), 67 deletions(-) diff --git a/backend/frostfs/frostfs.go b/backend/frostfs/frostfs.go index c05b516e5..bb9e93d37 100644 --- a/backend/frostfs/frostfs.go +++ b/backend/frostfs/frostfs.go @@ -127,6 +127,11 @@ func init() { }, }, }, + { + Name: "default_container_zone", + Default: "container", + Help: "The name of the zone in which containers will be created or resolved if the zone name is not explicitly specified with the container name.", + }, { Name: "container_creation_policy", Default: "private", @@ -166,6 +171,7 @@ type Options struct { Address string `config:"address"` Password string `config:"password"` PlacementPolicy string `config:"placement_policy"` + DefaultContainerZone string `config:"default_container_zone"` ContainerCreationPolicy string `config:"container_creation_policy"` APERules []chain.Rule `config:"-"` } @@ -501,26 +507,26 @@ func (f *Fs) Features() *fs.Features { // List the objects and directories in dir into entries. func (f *Fs) List(ctx context.Context, dir string) (fs.DirEntries, error) { - containerStr, containerPath := bucket.Split(path.Join(f.root, dir)) + rootDirName, containerPath := bucket.Split(path.Join(f.root, dir)) - if containerStr == "" { + if rootDirName == "" { if containerPath != "" { return nil, fs.ErrorListBucketRequired } return f.listContainers(ctx) } - return f.listEntries(ctx, containerStr, containerPath, dir, false) + return f.listEntries(ctx, rootDirName, containerPath, dir, false) } // ListR lists the objects and directories of the Fs starting // from dir recursively into out. func (f *Fs) ListR(ctx context.Context, dir string, callback fs.ListRCallback) error { - containerStr, containerPath := bucket.Split(path.Join(f.root, dir)) + rootDirName, containerPath := bucket.Split(path.Join(f.root, dir)) list := walk.NewListRHelper(callback) - if containerStr == "" { + if rootDirName == "" { if containerPath != "" { return fs.ErrorListBucketRequired } @@ -536,15 +542,15 @@ func (f *Fs) ListR(ctx context.Context, dir string, callback fs.ListRCallback) e return list.Flush() } - if err := f.listR(ctx, list, containerStr, containerPath, dir); err != nil { + if err := f.listR(ctx, list, rootDirName, containerPath, dir); err != nil { return err } return list.Flush() } -func (f *Fs) listR(ctx context.Context, list *walk.ListRHelper, containerStr, containerPath, dir string) error { - entries, err := f.listEntries(ctx, containerStr, containerPath, dir, true) +func (f *Fs) listR(ctx context.Context, list *walk.ListRHelper, rootDirName, containerPath, dir string) error { + entries, err := f.listEntries(ctx, rootDirName, containerPath, dir, true) if err != nil { return err } @@ -557,31 +563,36 @@ func (f *Fs) listR(ctx context.Context, list *walk.ListRHelper, containerStr, co return nil } -func (f *Fs) resolveOrCreateContainer(ctx context.Context, containerStr string) (cid.ID, error) { +func (f *Fs) resolveOrCreateContainer(ctx context.Context, rootDirName string) (cid.ID, error) { + // Due to the fact that this method is called when performing "put" operations, + // which can be run in parallel in several goroutines, + // we need to use a global lock here so that if a requested container is missing, + // multiple goroutines do not attempt to create a container with the same name simultaneously, + // which could cause unexpected behavior. f.m.Lock() defer f.m.Unlock() - cnrID, err := f.resolveContainerIDHelper(ctx, containerStr) + cnrID, err := f.resolveContainerIDHelper(ctx, rootDirName) if err == nil { return cnrID, err } - if cnrID, err = f.createContainer(ctx, containerStr); err != nil { - delete(f.containerIDCache, containerStr) + if cnrID, err = f.createContainer(ctx, rootDirName); err != nil { + delete(f.containerIDCache, rootDirName) return cid.ID{}, fmt.Errorf("createContainer: %w", err) } - f.containerIDCache[containerStr] = cnrID.String() + f.containerIDCache[rootDirName] = cnrID.String() return cnrID, nil } // Put the Object into the container func (f *Fs) Put(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) { - containerStr, containerPath := bucket.Split(filepath.Join(f.root, src.Remote())) + rootDirName, containerPath := bucket.Split(filepath.Join(f.root, src.Remote())) - cnrID, err := parseContainerID(containerStr) + cnrID, err := parseContainerID(rootDirName) if err != nil { - if cnrID, err = f.resolveOrCreateContainer(ctx, containerStr); err != nil { + if cnrID, err = f.resolveOrCreateContainer(ctx, rootDirName); err != nil { return nil, err } } @@ -663,10 +674,10 @@ func fillHeaders(ctx context.Context, filePath string, src fs.ObjectInfo, option func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) error { // When updating an object, the path to it should not change. src = robject.NewStaticObjectInfo(o.Remote(), src.ModTime(ctx), src.Size(), src.Storable(), nil, src.Fs()) - containerStr, containerPath := bucket.Split(filepath.Join(o.fs.root, src.Remote())) + rootDirName, containerPath := bucket.Split(filepath.Join(o.fs.root, src.Remote())) var cnrID cid.ID var err error - if cnrID, err = o.fs.parseContainer(ctx, containerStr); err != nil { + if cnrID, err = o.fs.parseContainer(ctx, rootDirName); err != nil { return fmt.Errorf("parse container: %w", err) } @@ -718,6 +729,10 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op return nil } +func (f *Fs) getContainerNameAndZone(containerStr string) (string, string) { + return getContainerNameAndZone(containerStr, f.opt.DefaultContainerZone) +} + // Remove an object func (o *Object) Remove(ctx context.Context) error { cnrID, _ := o.ContainerID() @@ -805,7 +820,7 @@ func (f *Fs) waitForAPECacheInvalidated(ctx context.Context, expectedCh chain.Ch } } -func (f *Fs) createContainer(ctx context.Context, containerName string) (cid.ID, error) { +func (f *Fs) createContainer(ctx context.Context, rootDirName string) (cid.ID, error) { var policy netmap.PlacementPolicy if err := policy.DecodeString(f.opt.PlacementPolicy); err != nil { return cid.ID{}, fmt.Errorf("parse placement policy: %w", err) @@ -817,10 +832,12 @@ func (f *Fs) createContainer(ctx context.Context, containerName string) (cid.ID, cnr.SetOwner(*f.owner) container.SetCreationTime(&cnr, time.Now()) - container.SetName(&cnr, containerName) + container.SetName(&cnr, rootDirName) + cnrName, cnrZone := f.getContainerNameAndZone(rootDirName) var domain container.Domain - domain.SetName(containerName) + domain.SetZone(cnrZone) + domain.SetName(cnrName) container.WriteDomain(&cnr, domain) if err := pool.SyncContainerWithNetwork(ctx, &cnr, f.pool); err != nil { @@ -866,14 +883,14 @@ func (f *Fs) createContainer(ctx context.Context, containerName string) (cid.ID, // Mkdir creates the container if it doesn't exist func (f *Fs) Mkdir(ctx context.Context, dir string) error { - containerStr, _ := bucket.Split(path.Join(f.root, dir)) - if containerStr == "" { + rootDirName, _ := bucket.Split(path.Join(f.root, dir)) + if rootDirName == "" { return nil } - _, err := parseContainerID(containerStr) + _, err := parseContainerID(rootDirName) if err != nil { - if _, err = f.resolveOrCreateContainer(ctx, containerStr); err != nil { + if _, err = f.resolveOrCreateContainer(ctx, rootDirName); err != nil { return err } } @@ -883,12 +900,12 @@ func (f *Fs) Mkdir(ctx context.Context, dir string) error { // Rmdir deletes the bucket if the fs is at the root func (f *Fs) Rmdir(ctx context.Context, dir string) error { - containerStr, containerPath := bucket.Split(path.Join(f.root, dir)) - if containerStr == "" || containerPath != "" { + rootDirName, containerPath := bucket.Split(path.Join(f.root, dir)) + if rootDirName == "" || containerPath != "" { return nil } - cnrID, err := f.parseContainer(ctx, containerStr) + cnrID, err := f.parseContainer(ctx, rootDirName) if err != nil { return fs.ErrorDirNotFound } @@ -908,18 +925,18 @@ func (f *Fs) Rmdir(ctx context.Context, dir string) error { f.m.Lock() defer f.m.Unlock() if err = f.pool.DeleteContainer(ctx, prm); err != nil { - return fmt.Errorf("couldn't delete container %s '%s': %w", cnrID, containerStr, err) + return fmt.Errorf("couldn't delete container %s '%s': %w", cnrID, rootDirName, err) } - delete(f.containerIDCache, containerStr) + delete(f.containerIDCache, rootDirName) return nil } // Purge deletes all the files and directories including the old versions. func (f *Fs) Purge(ctx context.Context, dir string) error { - containerStr, containerPath := bucket.Split(path.Join(f.root, dir)) + rootDirName, containerPath := bucket.Split(path.Join(f.root, dir)) - cnrID, err := f.parseContainer(ctx, containerStr) + cnrID, err := f.parseContainer(ctx, rootDirName) if err != nil { return nil } @@ -944,8 +961,9 @@ func parseContainerID(containerStr string) (cid.ID, error) { return cnrID, err } -func getContainerIDByName(dirEntry fs.DirEntry, containerName string) (ok bool, cnrID cid.ID, err error) { - if dirEntry.Remote() != containerName { +func getContainerIDByNameAndZone(dirEntry fs.DirEntry, cnrName, cnrZone, defaultZone string) (cnrID cid.ID, ok bool, err error) { + actualName, actualZone := getContainerNameAndZone(dirEntry.Remote(), defaultZone) + if cnrName != actualName || cnrZone != actualZone { return } var idEr fs.IDer @@ -956,61 +974,66 @@ func getContainerIDByName(dirEntry fs.DirEntry, containerName string) (ok bool, return } -func resolveContainerIDWithNNS(resolver *resolver.NNS, containerName string) (cid.ID, error) { +func resolveContainerIDWithNNS(resolver *resolver.NNS, cnrName, cnrZone string) (cid.ID, error) { var d container.Domain - d.SetName(containerName) + d.SetZone(cnrZone) + d.SetName(cnrName) if cnrID, err := resolver.ResolveContainerDomain(d); err == nil { return cnrID, err } - return cid.ID{}, fmt.Errorf("couldn't resolve container '%s'", containerName) + return cid.ID{}, fmt.Errorf("couldn't resolve container with name '%s' and zone '%s'", cnrName, cnrZone) } -func (f *Fs) resolveContainerIDHelper(ctx context.Context, containerName string) (cid.ID, error) { - cnrIDStr, ok := f.containerIDCache[containerName] - if ok { - return parseContainerID(cnrIDStr) +func (f *Fs) resolveCIDByRootDirName(ctx context.Context, rootDirName string) (cid.ID, error) { + cnrName, cnrZone := f.getContainerNameAndZone(rootDirName) + if cnrName == "" { + return cid.ID{}, fmt.Errorf("couldn't resolve container '%s'", rootDirName) } if f.resolver != nil { - var err error - var cnrID cid.ID - if cnrID, err = resolveContainerIDWithNNS(f.resolver, containerName); err == nil { - f.containerIDCache[containerName] = cnrID.String() - } - return cnrID, err + return resolveContainerIDWithNNS(f.resolver, cnrName, cnrZone) } if dirEntries, err := f.listContainers(ctx); err == nil { for _, dirEntry := range dirEntries { - if ok, cnrID, err := getContainerIDByName(dirEntry, containerName); ok { - if err == nil { - f.containerIDCache[containerName] = cnrID.String() - } + if cnrID, ok, err := getContainerIDByNameAndZone(dirEntry, cnrName, cnrZone, f.opt.DefaultContainerZone); ok { return cnrID, err } } } - return cid.ID{}, fmt.Errorf("couldn't resolve container '%s'", containerName) + return cid.ID{}, fmt.Errorf("couldn't resolve container '%s'", rootDirName) } -func (f *Fs) resolveContainerID(ctx context.Context, containerName string) (cid.ID, error) { +func (f *Fs) resolveContainerIDHelper(ctx context.Context, rootDirName string) (cid.ID, error) { + cnrIDStr, ok := f.containerIDCache[rootDirName] + if ok { + return parseContainerID(cnrIDStr) + } + cnrID, err := f.resolveCIDByRootDirName(ctx, rootDirName) + if err != nil { + return cid.ID{}, err + } + f.containerIDCache[rootDirName] = cnrID.String() + return cnrID, nil +} + +func (f *Fs) resolveContainerID(ctx context.Context, rootDirName string) (cid.ID, error) { f.m.Lock() defer f.m.Unlock() - return f.resolveContainerIDHelper(ctx, containerName) + return f.resolveContainerIDHelper(ctx, rootDirName) } -func (f *Fs) parseContainer(ctx context.Context, containerName string) (cid.ID, error) { - cnrID, err := parseContainerID(containerName) - if err == nil { - return cnrID, err +func (f *Fs) parseContainer(ctx context.Context, rootDirName string) (cid.ID, error) { + cnrID, err := parseContainerID(rootDirName) + if err != nil { + return f.resolveContainerID(ctx, rootDirName) } - - return f.resolveContainerID(ctx, containerName) + return cnrID, nil } -func (f *Fs) listEntries(ctx context.Context, containerStr, containerPath, directory string, recursive bool) (fs.DirEntries, error) { - cnrID, err := f.parseContainer(ctx, containerStr) +func (f *Fs) listEntries(ctx context.Context, rootDirName, containerPath, directory string, recursive bool) (fs.DirEntries, error) { + cnrID, err := f.parseContainer(ctx, rootDirName) if err != nil { return nil, fs.ErrorDirNotFound } @@ -1033,7 +1056,7 @@ func (f *Fs) listEntries(ctx context.Context, containerStr, containerPath, direc return nil, err } - objInf := newObject(f, obj, containerStr) + objInf := newObject(f, obj, rootDirName) if !recursive { withoutPath := strings.TrimPrefix(objInf.filePath, containerPath) @@ -1083,7 +1106,7 @@ func (f *Fs) listContainers(ctx context.Context) (fs.DirEntries, error) { return nil, fmt.Errorf("couldn't get container '%s': %w", containerID, err) } - res[i] = newDir(containerID, cnr) + res[i] = newDir(containerID, cnr, f.opt.DefaultContainerZone) } return res, nil diff --git a/backend/frostfs/util.go b/backend/frostfs/util.go index 16e171234..cec137933 100644 --- a/backend/frostfs/util.go +++ b/backend/frostfs/util.go @@ -296,16 +296,31 @@ func formObject(own *user.ID, cnrID cid.ID, name string, header map[string]strin return obj } -func newDir(cnrID cid.ID, cnr container.Container) *fs.Dir { +func newDir(cnrID cid.ID, cnr container.Container, defaultZone string) *fs.Dir { remote := cnrID.EncodeToString() timestamp := container.CreatedAt(cnr) if domain := container.ReadDomain(cnr); domain.Name() != "" { - remote = domain.Name() + if defaultZone != domain.Zone() { + remote = domain.Name() + "." + domain.Zone() + } else { + remote = domain.Name() + } } dir := fs.NewDir(remote, timestamp) dir.SetID(cnrID.String()) - return dir } + +func getContainerNameAndZone(containerStr, defaultZone string) (cnrName string, cnrZone string) { + defer func() { + if len(cnrZone) == 0 { + cnrZone = defaultZone + } + }() + if idx := strings.Index(containerStr, "."); idx >= 0 { + return containerStr[:idx], containerStr[idx+1:] + } + return containerStr, defaultZone +} diff --git a/backend/frostfs/util_test.go b/backend/frostfs/util_test.go index c241949a3..eaee58e00 100644 --- a/backend/frostfs/util_test.go +++ b/backend/frostfs/util_test.go @@ -7,6 +7,57 @@ import ( "github.com/stretchr/testify/require" ) +func TestGetZoneAndContainerNames(t *testing.T) { + for i, tc := range []struct { + cnrStr string + defZone string + expectedName string + expectedZone string + }{ + { + cnrStr: "", + defZone: "def_zone", + expectedName: "", + expectedZone: "def_zone", + }, + { + cnrStr: "", + defZone: "def_zone", + expectedName: "", + expectedZone: "def_zone", + }, + { + cnrStr: "cnr_name", + defZone: "def_zone", + expectedName: "cnr_name", + expectedZone: "def_zone", + }, + { + cnrStr: "cnr_name.", + defZone: "def_zone", + expectedName: "cnr_name", + expectedZone: "def_zone", + }, + { + cnrStr: ".cnr_zone", + defZone: "def_zone", + expectedName: "", + expectedZone: "cnr_zone", + }, { + cnrStr: ".cnr_zone", + defZone: "def_zone", + expectedName: "", + expectedZone: "cnr_zone", + }, + } { + t.Run(strconv.Itoa(i), func(t *testing.T) { + actualName, actualZone := getContainerNameAndZone(tc.cnrStr, tc.defZone) + require.Equal(t, tc.expectedZone, actualZone) + require.Equal(t, tc.expectedName, actualName) + }) + } +} + func TestParseContainerCreationPolicy(t *testing.T) { for i, tc := range []struct { ACLString string