[#5] Add container zone names support.

Signed-off-by: Aleksey Kravchenko <al.kravchenko@yadro.com>
This commit is contained in:
Aleksey Kravchenko 2025-01-22 19:07:48 +03:00
parent b93e134b5b
commit be95531bfd
3 changed files with 156 additions and 67 deletions

View file

@ -127,6 +127,11 @@ func init() {
}, },
}, },
}, },
{
Name: "default_container_zone",
Default: "container",
Help: "The name of the zone in which containers will be created or resolved if the zone name is not explicitly specified with the container name.",
},
{ {
Name: "container_creation_policy", Name: "container_creation_policy",
Default: "private", Default: "private",
@ -166,6 +171,7 @@ type Options struct {
Address string `config:"address"` Address string `config:"address"`
Password string `config:"password"` Password string `config:"password"`
PlacementPolicy string `config:"placement_policy"` PlacementPolicy string `config:"placement_policy"`
DefaultContainerZone string `config:"default_container_zone"`
ContainerCreationPolicy string `config:"container_creation_policy"` ContainerCreationPolicy string `config:"container_creation_policy"`
APERules []chain.Rule `config:"-"` APERules []chain.Rule `config:"-"`
} }
@ -501,26 +507,26 @@ func (f *Fs) Features() *fs.Features {
// List the objects and directories in dir into entries. // List the objects and directories in dir into entries.
func (f *Fs) List(ctx context.Context, dir string) (fs.DirEntries, error) { func (f *Fs) List(ctx context.Context, dir string) (fs.DirEntries, error) {
containerStr, containerPath := bucket.Split(path.Join(f.root, dir)) rootDirName, containerPath := bucket.Split(path.Join(f.root, dir))
if containerStr == "" { if rootDirName == "" {
if containerPath != "" { if containerPath != "" {
return nil, fs.ErrorListBucketRequired return nil, fs.ErrorListBucketRequired
} }
return f.listContainers(ctx) return f.listContainers(ctx)
} }
return f.listEntries(ctx, containerStr, containerPath, dir, false) return f.listEntries(ctx, rootDirName, containerPath, dir, false)
} }
// ListR lists the objects and directories of the Fs starting // ListR lists the objects and directories of the Fs starting
// from dir recursively into out. // from dir recursively into out.
func (f *Fs) ListR(ctx context.Context, dir string, callback fs.ListRCallback) error { func (f *Fs) ListR(ctx context.Context, dir string, callback fs.ListRCallback) error {
containerStr, containerPath := bucket.Split(path.Join(f.root, dir)) rootDirName, containerPath := bucket.Split(path.Join(f.root, dir))
list := walk.NewListRHelper(callback) list := walk.NewListRHelper(callback)
if containerStr == "" { if rootDirName == "" {
if containerPath != "" { if containerPath != "" {
return fs.ErrorListBucketRequired return fs.ErrorListBucketRequired
} }
@ -536,15 +542,15 @@ func (f *Fs) ListR(ctx context.Context, dir string, callback fs.ListRCallback) e
return list.Flush() return list.Flush()
} }
if err := f.listR(ctx, list, containerStr, containerPath, dir); err != nil { if err := f.listR(ctx, list, rootDirName, containerPath, dir); err != nil {
return err return err
} }
return list.Flush() return list.Flush()
} }
func (f *Fs) listR(ctx context.Context, list *walk.ListRHelper, containerStr, containerPath, dir string) error { func (f *Fs) listR(ctx context.Context, list *walk.ListRHelper, rootDirName, containerPath, dir string) error {
entries, err := f.listEntries(ctx, containerStr, containerPath, dir, true) entries, err := f.listEntries(ctx, rootDirName, containerPath, dir, true)
if err != nil { if err != nil {
return err return err
} }
@ -557,31 +563,36 @@ func (f *Fs) listR(ctx context.Context, list *walk.ListRHelper, containerStr, co
return nil return nil
} }
func (f *Fs) resolveOrCreateContainer(ctx context.Context, containerStr string) (cid.ID, error) { func (f *Fs) resolveOrCreateContainer(ctx context.Context, rootDirName string) (cid.ID, error) {
// Due to the fact that this method is called when performing "put" operations,
// which can be run in parallel in several goroutines,
// we need to use a global lock here so that if a requested container is missing,
// multiple goroutines do not attempt to create a container with the same name simultaneously,
// which could cause unexpected behavior.
f.m.Lock() f.m.Lock()
defer f.m.Unlock() defer f.m.Unlock()
cnrID, err := f.resolveContainerIDHelper(ctx, containerStr) cnrID, err := f.resolveContainerIDHelper(ctx, rootDirName)
if err == nil { if err == nil {
return cnrID, err return cnrID, err
} }
if cnrID, err = f.createContainer(ctx, containerStr); err != nil { if cnrID, err = f.createContainer(ctx, rootDirName); err != nil {
delete(f.containerIDCache, containerStr) delete(f.containerIDCache, rootDirName)
return cid.ID{}, fmt.Errorf("createContainer: %w", err) return cid.ID{}, fmt.Errorf("createContainer: %w", err)
} }
f.containerIDCache[containerStr] = cnrID.String() f.containerIDCache[rootDirName] = cnrID.String()
return cnrID, nil return cnrID, nil
} }
// Put the Object into the container // Put the Object into the container
func (f *Fs) Put(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) { func (f *Fs) Put(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) {
containerStr, containerPath := bucket.Split(filepath.Join(f.root, src.Remote())) rootDirName, containerPath := bucket.Split(filepath.Join(f.root, src.Remote()))
cnrID, err := parseContainerID(containerStr) cnrID, err := parseContainerID(rootDirName)
if err != nil { if err != nil {
if cnrID, err = f.resolveOrCreateContainer(ctx, containerStr); err != nil { if cnrID, err = f.resolveOrCreateContainer(ctx, rootDirName); err != nil {
return nil, err return nil, err
} }
} }
@ -663,10 +674,10 @@ func fillHeaders(ctx context.Context, filePath string, src fs.ObjectInfo, option
func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) error { func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) error {
// When updating an object, the path to it should not change. // When updating an object, the path to it should not change.
src = robject.NewStaticObjectInfo(o.Remote(), src.ModTime(ctx), src.Size(), src.Storable(), nil, src.Fs()) src = robject.NewStaticObjectInfo(o.Remote(), src.ModTime(ctx), src.Size(), src.Storable(), nil, src.Fs())
containerStr, containerPath := bucket.Split(filepath.Join(o.fs.root, src.Remote())) rootDirName, containerPath := bucket.Split(filepath.Join(o.fs.root, src.Remote()))
var cnrID cid.ID var cnrID cid.ID
var err error var err error
if cnrID, err = o.fs.parseContainer(ctx, containerStr); err != nil { if cnrID, err = o.fs.parseContainer(ctx, rootDirName); err != nil {
return fmt.Errorf("parse container: %w", err) return fmt.Errorf("parse container: %w", err)
} }
@ -718,6 +729,10 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op
return nil return nil
} }
func (f *Fs) getContainerNameAndZone(containerStr string) (string, string) {
return getContainerNameAndZone(containerStr, f.opt.DefaultContainerZone)
}
// Remove an object // Remove an object
func (o *Object) Remove(ctx context.Context) error { func (o *Object) Remove(ctx context.Context) error {
cnrID, _ := o.ContainerID() cnrID, _ := o.ContainerID()
@ -805,7 +820,7 @@ func (f *Fs) waitForAPECacheInvalidated(ctx context.Context, expectedCh chain.Ch
} }
} }
func (f *Fs) createContainer(ctx context.Context, containerName string) (cid.ID, error) { func (f *Fs) createContainer(ctx context.Context, rootDirName string) (cid.ID, error) {
var policy netmap.PlacementPolicy var policy netmap.PlacementPolicy
if err := policy.DecodeString(f.opt.PlacementPolicy); err != nil { if err := policy.DecodeString(f.opt.PlacementPolicy); err != nil {
return cid.ID{}, fmt.Errorf("parse placement policy: %w", err) return cid.ID{}, fmt.Errorf("parse placement policy: %w", err)
@ -817,10 +832,12 @@ func (f *Fs) createContainer(ctx context.Context, containerName string) (cid.ID,
cnr.SetOwner(*f.owner) cnr.SetOwner(*f.owner)
container.SetCreationTime(&cnr, time.Now()) container.SetCreationTime(&cnr, time.Now())
container.SetName(&cnr, containerName) container.SetName(&cnr, rootDirName)
cnrName, cnrZone := f.getContainerNameAndZone(rootDirName)
var domain container.Domain var domain container.Domain
domain.SetName(containerName) domain.SetZone(cnrZone)
domain.SetName(cnrName)
container.WriteDomain(&cnr, domain) container.WriteDomain(&cnr, domain)
if err := pool.SyncContainerWithNetwork(ctx, &cnr, f.pool); err != nil { if err := pool.SyncContainerWithNetwork(ctx, &cnr, f.pool); err != nil {
@ -866,14 +883,14 @@ func (f *Fs) createContainer(ctx context.Context, containerName string) (cid.ID,
// Mkdir creates the container if it doesn't exist // Mkdir creates the container if it doesn't exist
func (f *Fs) Mkdir(ctx context.Context, dir string) error { func (f *Fs) Mkdir(ctx context.Context, dir string) error {
containerStr, _ := bucket.Split(path.Join(f.root, dir)) rootDirName, _ := bucket.Split(path.Join(f.root, dir))
if containerStr == "" { if rootDirName == "" {
return nil return nil
} }
_, err := parseContainerID(containerStr) _, err := parseContainerID(rootDirName)
if err != nil { if err != nil {
if _, err = f.resolveOrCreateContainer(ctx, containerStr); err != nil { if _, err = f.resolveOrCreateContainer(ctx, rootDirName); err != nil {
return err return err
} }
} }
@ -883,12 +900,12 @@ func (f *Fs) Mkdir(ctx context.Context, dir string) error {
// Rmdir deletes the bucket if the fs is at the root // Rmdir deletes the bucket if the fs is at the root
func (f *Fs) Rmdir(ctx context.Context, dir string) error { func (f *Fs) Rmdir(ctx context.Context, dir string) error {
containerStr, containerPath := bucket.Split(path.Join(f.root, dir)) rootDirName, containerPath := bucket.Split(path.Join(f.root, dir))
if containerStr == "" || containerPath != "" { if rootDirName == "" || containerPath != "" {
return nil return nil
} }
cnrID, err := f.parseContainer(ctx, containerStr) cnrID, err := f.parseContainer(ctx, rootDirName)
if err != nil { if err != nil {
return fs.ErrorDirNotFound return fs.ErrorDirNotFound
} }
@ -908,18 +925,18 @@ func (f *Fs) Rmdir(ctx context.Context, dir string) error {
f.m.Lock() f.m.Lock()
defer f.m.Unlock() defer f.m.Unlock()
if err = f.pool.DeleteContainer(ctx, prm); err != nil { if err = f.pool.DeleteContainer(ctx, prm); err != nil {
return fmt.Errorf("couldn't delete container %s '%s': %w", cnrID, containerStr, err) return fmt.Errorf("couldn't delete container %s '%s': %w", cnrID, rootDirName, err)
} }
delete(f.containerIDCache, containerStr) delete(f.containerIDCache, rootDirName)
return nil return nil
} }
// Purge deletes all the files and directories including the old versions. // Purge deletes all the files and directories including the old versions.
func (f *Fs) Purge(ctx context.Context, dir string) error { func (f *Fs) Purge(ctx context.Context, dir string) error {
containerStr, containerPath := bucket.Split(path.Join(f.root, dir)) rootDirName, containerPath := bucket.Split(path.Join(f.root, dir))
cnrID, err := f.parseContainer(ctx, containerStr) cnrID, err := f.parseContainer(ctx, rootDirName)
if err != nil { if err != nil {
return nil return nil
} }
@ -944,8 +961,9 @@ func parseContainerID(containerStr string) (cid.ID, error) {
return cnrID, err return cnrID, err
} }
func getContainerIDByName(dirEntry fs.DirEntry, containerName string) (ok bool, cnrID cid.ID, err error) { func getContainerIDByNameAndZone(dirEntry fs.DirEntry, cnrName, cnrZone, defaultZone string) (cnrID cid.ID, ok bool, err error) {
if dirEntry.Remote() != containerName { actualName, actualZone := getContainerNameAndZone(dirEntry.Remote(), defaultZone)
if cnrName != actualName || cnrZone != actualZone {
return return
} }
var idEr fs.IDer var idEr fs.IDer
@ -956,61 +974,66 @@ func getContainerIDByName(dirEntry fs.DirEntry, containerName string) (ok bool,
return return
} }
func resolveContainerIDWithNNS(resolver *resolver.NNS, containerName string) (cid.ID, error) { func resolveContainerIDWithNNS(resolver *resolver.NNS, cnrName, cnrZone string) (cid.ID, error) {
var d container.Domain var d container.Domain
d.SetName(containerName) d.SetZone(cnrZone)
d.SetName(cnrName)
if cnrID, err := resolver.ResolveContainerDomain(d); err == nil { if cnrID, err := resolver.ResolveContainerDomain(d); err == nil {
return cnrID, err return cnrID, err
} }
return cid.ID{}, fmt.Errorf("couldn't resolve container '%s'", containerName) return cid.ID{}, fmt.Errorf("couldn't resolve container with name '%s' and zone '%s'", cnrName, cnrZone)
} }
func (f *Fs) resolveContainerIDHelper(ctx context.Context, containerName string) (cid.ID, error) { func (f *Fs) resolveCIDByRootDirName(ctx context.Context, rootDirName string) (cid.ID, error) {
cnrIDStr, ok := f.containerIDCache[containerName] cnrName, cnrZone := f.getContainerNameAndZone(rootDirName)
if ok { if cnrName == "" {
return parseContainerID(cnrIDStr) return cid.ID{}, fmt.Errorf("couldn't resolve container '%s'", rootDirName)
} }
if f.resolver != nil { if f.resolver != nil {
var err error return resolveContainerIDWithNNS(f.resolver, cnrName, cnrZone)
var cnrID cid.ID
if cnrID, err = resolveContainerIDWithNNS(f.resolver, containerName); err == nil {
f.containerIDCache[containerName] = cnrID.String()
}
return cnrID, err
} }
if dirEntries, err := f.listContainers(ctx); err == nil { if dirEntries, err := f.listContainers(ctx); err == nil {
for _, dirEntry := range dirEntries { for _, dirEntry := range dirEntries {
if ok, cnrID, err := getContainerIDByName(dirEntry, containerName); ok { if cnrID, ok, err := getContainerIDByNameAndZone(dirEntry, cnrName, cnrZone, f.opt.DefaultContainerZone); ok {
if err == nil {
f.containerIDCache[containerName] = cnrID.String()
}
return cnrID, err return cnrID, err
} }
} }
} }
return cid.ID{}, fmt.Errorf("couldn't resolve container '%s'", containerName) return cid.ID{}, fmt.Errorf("couldn't resolve container '%s'", rootDirName)
} }
func (f *Fs) resolveContainerID(ctx context.Context, containerName string) (cid.ID, error) { func (f *Fs) resolveContainerIDHelper(ctx context.Context, rootDirName string) (cid.ID, error) {
cnrIDStr, ok := f.containerIDCache[rootDirName]
if ok {
return parseContainerID(cnrIDStr)
}
cnrID, err := f.resolveCIDByRootDirName(ctx, rootDirName)
if err != nil {
return cid.ID{}, err
}
f.containerIDCache[rootDirName] = cnrID.String()
return cnrID, nil
}
func (f *Fs) resolveContainerID(ctx context.Context, rootDirName string) (cid.ID, error) {
f.m.Lock() f.m.Lock()
defer f.m.Unlock() defer f.m.Unlock()
return f.resolveContainerIDHelper(ctx, containerName) return f.resolveContainerIDHelper(ctx, rootDirName)
} }
func (f *Fs) parseContainer(ctx context.Context, containerName string) (cid.ID, error) { func (f *Fs) parseContainer(ctx context.Context, rootDirName string) (cid.ID, error) {
cnrID, err := parseContainerID(containerName) cnrID, err := parseContainerID(rootDirName)
if err == nil { if err != nil {
return cnrID, err return f.resolveContainerID(ctx, rootDirName)
} }
return cnrID, nil
return f.resolveContainerID(ctx, containerName)
} }
func (f *Fs) listEntries(ctx context.Context, containerStr, containerPath, directory string, recursive bool) (fs.DirEntries, error) { func (f *Fs) listEntries(ctx context.Context, rootDirName, containerPath, directory string, recursive bool) (fs.DirEntries, error) {
cnrID, err := f.parseContainer(ctx, containerStr) cnrID, err := f.parseContainer(ctx, rootDirName)
if err != nil { if err != nil {
return nil, fs.ErrorDirNotFound return nil, fs.ErrorDirNotFound
} }
@ -1033,7 +1056,7 @@ func (f *Fs) listEntries(ctx context.Context, containerStr, containerPath, direc
return nil, err return nil, err
} }
objInf := newObject(f, obj, containerStr) objInf := newObject(f, obj, rootDirName)
if !recursive { if !recursive {
withoutPath := strings.TrimPrefix(objInf.filePath, containerPath) withoutPath := strings.TrimPrefix(objInf.filePath, containerPath)
@ -1083,7 +1106,7 @@ func (f *Fs) listContainers(ctx context.Context) (fs.DirEntries, error) {
return nil, fmt.Errorf("couldn't get container '%s': %w", containerID, err) return nil, fmt.Errorf("couldn't get container '%s': %w", containerID, err)
} }
res[i] = newDir(containerID, cnr) res[i] = newDir(containerID, cnr, f.opt.DefaultContainerZone)
} }
return res, nil return res, nil

View file

@ -296,16 +296,31 @@ func formObject(own *user.ID, cnrID cid.ID, name string, header map[string]strin
return obj return obj
} }
func newDir(cnrID cid.ID, cnr container.Container) *fs.Dir { func newDir(cnrID cid.ID, cnr container.Container, defaultZone string) *fs.Dir {
remote := cnrID.EncodeToString() remote := cnrID.EncodeToString()
timestamp := container.CreatedAt(cnr) timestamp := container.CreatedAt(cnr)
if domain := container.ReadDomain(cnr); domain.Name() != "" { if domain := container.ReadDomain(cnr); domain.Name() != "" {
remote = domain.Name() if defaultZone != domain.Zone() {
remote = domain.Name() + "." + domain.Zone()
} else {
remote = domain.Name()
}
} }
dir := fs.NewDir(remote, timestamp) dir := fs.NewDir(remote, timestamp)
dir.SetID(cnrID.String()) dir.SetID(cnrID.String())
return dir return dir
} }
func getContainerNameAndZone(containerStr, defaultZone string) (cnrName string, cnrZone string) {
defer func() {
if len(cnrZone) == 0 {
cnrZone = defaultZone
}
}()
if idx := strings.Index(containerStr, "."); idx >= 0 {
return containerStr[:idx], containerStr[idx+1:]
}
return containerStr, defaultZone
}

View file

@ -7,6 +7,57 @@ import (
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
) )
func TestGetZoneAndContainerNames(t *testing.T) {
for i, tc := range []struct {
cnrStr string
defZone string
expectedName string
expectedZone string
}{
{
cnrStr: "",
defZone: "def_zone",
expectedName: "",
expectedZone: "def_zone",
},
{
cnrStr: "",
defZone: "def_zone",
expectedName: "",
expectedZone: "def_zone",
},
{
cnrStr: "cnr_name",
defZone: "def_zone",
expectedName: "cnr_name",
expectedZone: "def_zone",
},
{
cnrStr: "cnr_name.",
defZone: "def_zone",
expectedName: "cnr_name",
expectedZone: "def_zone",
},
{
cnrStr: ".cnr_zone",
defZone: "def_zone",
expectedName: "",
expectedZone: "cnr_zone",
}, {
cnrStr: ".cnr_zone",
defZone: "def_zone",
expectedName: "",
expectedZone: "cnr_zone",
},
} {
t.Run(strconv.Itoa(i), func(t *testing.T) {
actualName, actualZone := getContainerNameAndZone(tc.cnrStr, tc.defZone)
require.Equal(t, tc.expectedZone, actualZone)
require.Equal(t, tc.expectedName, actualName)
})
}
}
func TestParseContainerCreationPolicy(t *testing.T) { func TestParseContainerCreationPolicy(t *testing.T) {
for i, tc := range []struct { for i, tc := range []struct {
ACLString string ACLString string