// Package frostfs provides an interface to FrostFS decentralized distributed object storage system package frostfs import ( "bytes" "context" "encoding/hex" "fmt" "io" "math" "path" "path/filepath" "strconv" "strings" "sync" "time" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/ape" sdkClient "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container" cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap" resolver "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/ns" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user" "git.frostfs.info/TrueCloudLab/policy-engine/pkg/chain" "github.com/rclone/rclone/fs" "github.com/rclone/rclone/fs/config/configmap" "github.com/rclone/rclone/fs/config/configstruct" "github.com/rclone/rclone/fs/hash" robject "github.com/rclone/rclone/fs/object" "github.com/rclone/rclone/fs/walk" "github.com/rclone/rclone/lib/bucket" ) func init() { fs.Register(&fs.RegInfo{ Name: "frostfs", Description: "Distributed, decentralized object storage FrostFS", NewFs: NewFs, Options: []fs.Option{ { Name: "endpoint", Help: "Endpoints to connect to FrostFS node", Required: true, Examples: []fs.OptionExample{ { Value: "s01.frostfs.devenv:8080", Help: "One endpoint.", }, { Value: "s01.frostfs.devenv:8080 s02.frostfs.devenv:8080", Help: "Multiple endpoints to form pool.", }, { Value: "s01.frostfs.devenv:8080,1 s02.frostfs.devenv:8080,2", Help: "Multiple endpoints with priority (lower value has higher priority). Until s01 is healthy, all requests will be sent to it.", }, { Value: "s01.frostfs.devenv:8080,1,1 s02.frostfs.devenv:8080,2,1 s03.frostfs.devenv:8080,2,9", Help: "Multiple endpoints with priority and weights. After the s01 endpoint is unhealthy, requests will be sent to the s02 and s03 endpoints in proportions of 10% and 90%, respectively.", }, }, }, { Name: "connection_timeout", Default: 4 * time.Second, Help: "FrostFS connection timeout", }, { Name: "request_timeout", Default: 12 * time.Second, Help: "FrostFS request timeout", }, { Name: "rebalance_interval", Default: 15 * time.Second, Help: "FrostFS rebalance connections interval", }, { Name: "session_expiration", Default: math.MaxUint32, Help: "FrostFS session expiration epoch", }, { Name: "ape_cache_invalidation_duration", Default: 8 * time.Second, Help: "APE cache invalidation duration", }, { Name: "ape_cache_invalidation_timeout", Default: 24 * time.Second, Help: "APE cache invalidation timeout", }, { Name: "ape_chain_check_interval", Default: 500 * time.Millisecond, Help: "The interval for verifying that the APE chain is saved in FrostFS.", }, { Name: "rpc_endpoint", Help: "Endpoint to connect to Neo rpc node", }, { Name: "wallet", Help: "Path to wallet", Required: true, }, { Name: "address", Help: "Address of account", }, { Name: "password", Help: "Password to decrypt wallet", }, { Name: "placement_policy", Default: "REP 3", Help: "Placement policy for new containers", Examples: []fs.OptionExample{ { Value: "REP 3", Help: "Container will have 3 replicas", }, }, }, { Name: "default_container_zone", Default: "container", Help: "The name of the zone in which containers will be created or resolved if the zone name is not explicitly specified with the container name.", }, { Name: "container_creation_policy", Default: "private", Help: "Container creation policy for new containers", Examples: []fs.OptionExample{ { Value: "public-read-write", Help: "Public container, anyone can read and write", }, { Value: "public-read", Help: "Public container, owner can read and write, others can only read", }, { Value: "private", Help: "Private container, only owner has access to it", }, }, }, }, }) } // Options defines the configuration for this backend type Options struct { FrostfsEndpoint string `config:"endpoint"` FrostfsConnectionTimeout fs.Duration `config:"connection_timeout"` FrostfsRequestTimeout fs.Duration `config:"request_timeout"` FrostfsRebalanceInterval fs.Duration `config:"rebalance_interval"` FrostfsSessionExpiration uint64 `config:"session_expiration"` APECacheInvalidationDuration fs.Duration `config:"ape_cache_invalidation_duration"` APECacheInvalidationTimeout fs.Duration `config:"ape_cache_invalidation_timeout"` APEChainCheckInterval fs.Duration `config:"ape_chain_check_interval"` RPCEndpoint string `config:"rpc_endpoint"` Wallet string `config:"wallet"` Address string `config:"address"` Password string `config:"password"` PlacementPolicy string `config:"placement_policy"` DefaultContainerZone string `config:"default_container_zone"` ContainerCreationPolicy string `config:"container_creation_policy"` APERules []chain.Rule `config:"-"` } // Fs represents a remote frostfs server type Fs struct { name string // the name of the remote root string // root of the bucket - ignore all objects above this opt *Options ctx context.Context pool *pool.Pool owner *user.ID rootContainer string rootDirectory string features *fs.Features resolver *resolver.NNS m sync.Mutex containerIDCache map[string]string } type searchFilter struct { Header string Value string MatchType object.SearchMatchType } // Object describes a frostfs object type Object struct { *object.Object fs *Fs name string remote string filePath string contentType string timestamp time.Time } // Shutdown the backend, closing any background tasks and any // cached connections. func (f *Fs) Shutdown(_ context.Context) error { f.pool.Close() return nil } // PutStream uploads to the remote path with the modTime given of indeterminate size func (f *Fs) PutStream(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) { return f.Put(ctx, in, src, options...) } // NewFs creates a new Fs object from the name and root. It connects to // the host specified in the config file. func NewFs(ctx context.Context, name string, root string, m configmap.Mapper) (fs.Fs, error) { opt := new(Options) err := configstruct.Set(m, opt) if err != nil { return nil, err } opt.APERules, err = parseContainerCreationPolicyString(opt.ContainerCreationPolicy) if err != nil { return nil, fmt.Errorf("couldn't parse container creation policy: %w", err) } acc, err := getAccount(opt) if err != nil { return nil, err } p, err := createPool(ctx, acc.PrivateKey(), opt) if err != nil { return nil, err } nnsResolver, err := createNNSResolver(opt) if err != nil { return nil, err } var owner user.ID user.IDFromKey(&owner, acc.PrivateKey().PrivateKey.PublicKey) f := &Fs{ name: name, opt: opt, ctx: ctx, pool: p, owner: &owner, resolver: nnsResolver, containerIDCache: make(map[string]string), } f.setRoot(root) f.features = (&fs.Features{ DuplicateFiles: true, ReadMimeType: true, WriteMimeType: true, BucketBased: true, BucketBasedRootOK: true, }).Fill(ctx, f) if f.rootContainer != "" && f.rootDirectory != "" && !strings.HasSuffix(root, "/") { // Check to see if the (container,directory) is actually an existing file oldRoot := f.root newRoot, leaf := path.Split(oldRoot) f.setRoot(newRoot) _, err := f.NewObject(ctx, leaf) if err != nil { // File doesn't exist or is a directory so return old f f.setRoot(oldRoot) return f, nil } // return an error with a fs which points to the parent return f, fs.ErrorIsFile } return f, nil } func newObject(f *Fs, obj object.Object, container string) *Object { // we should not include rootDirectory into remote name prefix := f.rootDirectory if prefix != "" { prefix += "/" } objID, _ := obj.ID() objInfo := &Object{ Object: &obj, fs: f, name: objID.EncodeToString(), } for _, attr := range obj.Attributes() { if attr.Key() == object.AttributeFileName { objInfo.name = attr.Value() } if attr.Key() == object.AttributeFilePath { objInfo.filePath = attr.Value() } if attr.Key() == object.AttributeContentType { objInfo.contentType = attr.Value() } if attr.Key() == object.AttributeTimestamp { value, err := strconv.ParseInt(attr.Value(), 10, 64) if err != nil { continue } objInfo.timestamp = time.Unix(value, 0) } } if objInfo.filePath == "" { objInfo.filePath = objInfo.name } objInfo.remote = objInfo.filePath if strings.Contains(objInfo.remote, prefix) { objInfo.remote = objInfo.remote[len(prefix):] if container != "" && f.rootContainer == "" { // we should add container name to remote name if root is empty objInfo.remote = path.Join(container, objInfo.remote) } } return objInfo } // MimeType of an Object if known, "" otherwise func (o *Object) MimeType(_ context.Context) string { return o.contentType } // String implements the Stringer interface func (o *Object) String() string { if o == nil { return "" } return o.filePath } // ID returns the ID of the Object if known, or "" if not func (o *Object) ID() string { return o.objectID().EncodeToString() } func (o *Object) objectID() oid.ID { objID, _ := o.Object.ID() return objID } // Remote returns the remote path func (o *Object) Remote() string { return o.remote } // ModTime returns the modification time of the object func (o *Object) ModTime(_ context.Context) time.Time { return o.timestamp } // Size returns the size of an object in bytes func (o *Object) Size() int64 { return int64(o.PayloadSize()) } // Fs returns the parent Fs func (o *Object) Fs() fs.Info { return o.fs } // Hash returns the SHA-256 hash of an object returning a lowercase hex string func (o *Object) Hash(_ context.Context, ty hash.Type) (string, error) { if ty != hash.SHA256 { return "", hash.ErrUnsupported } payloadCheckSum, _ := o.PayloadChecksum() return hex.EncodeToString(payloadCheckSum.Value()), nil } // Storable says whether this object can be stored func (o *Object) Storable() bool { return true } // SetModTime sets the modification time of the local fs object func (o *Object) SetModTime(context.Context, time.Time) error { return fs.ErrorCantSetModTime } // BuffCloser is wrapper to load files from frostfs. type BuffCloser struct { io.Reader } // Close implements the Closer interface func (bc *BuffCloser) Close() error { return nil } // Open an object for read func (o *Object) Open(ctx context.Context, options ...fs.OpenOption) (io.ReadCloser, error) { var isRange bool offset, length := uint64(0), o.PayloadSize() fs.FixRangeOption(options, int64(o.PayloadSize())) for _, option := range options { switch option := option.(type) { case *fs.RangeOption: isRange = true offset = uint64(option.Start) if option.End < 0 { option.End = int64(o.PayloadSize()) + option.End } length = uint64(option.End - option.Start + 1) case *fs.SeekOption: isRange = true offset = uint64(option.Offset) length = o.PayloadSize() - uint64(option.Offset) default: if option.Mandatory() { fs.Logf(o, "Unsupported mandatory option: %v", option) } } } cnrID, _ := o.ContainerID() addr := newAddress(cnrID, o.objectID()) if isRange { var prm pool.PrmObjectRange prm.SetAddress(addr) prm.SetOffset(offset) prm.SetLength(length) rangeReader, err := o.fs.pool.ObjectRange(ctx, prm) if err != nil { return nil, err } return &rangeReader, nil } var prm pool.PrmObjectGet prm.SetAddress(addr) // we cannot use ObjectRange in this case because it panics if zero length range is requested res, err := o.fs.pool.GetObject(ctx, prm) if err != nil { return nil, fmt.Errorf("couldn't get object %s: %w", addr, err) } return res.Payload, nil } // Name of the remote (as passed into NewFs) func (f *Fs) Name() string { return f.name } // Root of the remote (as passed into NewFs) func (f *Fs) Root() string { return f.root } // String converts this Fs to a string func (f *Fs) String() string { if f.rootContainer == "" { return "FrostFS root" } if f.rootDirectory == "" { return fmt.Sprintf("FrostFS container %s", f.rootContainer) } return fmt.Sprintf("FrostFS container %s path %s", f.rootContainer, f.rootDirectory) } // Precision of the remote func (f *Fs) Precision() time.Duration { return time.Second } // Hashes returns the supported hash sets. func (f *Fs) Hashes() hash.Set { return hash.Set(hash.SHA256) } // Features returns the optional features of this Fs func (f *Fs) Features() *fs.Features { return f.features } // List the objects and directories in dir into entries. func (f *Fs) List(ctx context.Context, dir string) (fs.DirEntries, error) { rootDirName, containerPath := bucket.Split(path.Join(f.root, dir)) if rootDirName == "" { if containerPath != "" { return nil, fs.ErrorListBucketRequired } return f.listContainers(ctx) } return f.listEntries(ctx, rootDirName, containerPath, dir, false) } // ListR lists the objects and directories of the Fs starting // from dir recursively into out. func (f *Fs) ListR(ctx context.Context, dir string, callback fs.ListRCallback) error { rootDirName, containerPath := bucket.Split(path.Join(f.root, dir)) list := walk.NewListRHelper(callback) if rootDirName == "" { if containerPath != "" { return fs.ErrorListBucketRequired } containers, err := f.listContainers(ctx) if err != nil { return err } for _, containerDir := range containers { if err = f.listR(ctx, list, containerDir.Remote(), containerPath, dir); err != nil { return err } } return list.Flush() } if err := f.listR(ctx, list, rootDirName, containerPath, dir); err != nil { return err } return list.Flush() } func (f *Fs) listR(ctx context.Context, list *walk.ListRHelper, rootDirName, containerPath, dir string) error { entries, err := f.listEntries(ctx, rootDirName, containerPath, dir, true) if err != nil { return err } for _, entry := range entries { if err = list.Add(entry); err != nil { return err } } return nil } func (f *Fs) resolveOrCreateContainer(ctx context.Context, rootDirName string) (cid.ID, error) { // Due to the fact that this method is called when performing "put" operations, // which can be run in parallel in several goroutines, // we need to use a global lock here so that if a requested container is missing, // multiple goroutines do not attempt to create a container with the same name simultaneously, // which could cause unexpected behavior. f.m.Lock() defer f.m.Unlock() cnrID, err := f.resolveContainerIDHelper(ctx, rootDirName) if err == nil { return cnrID, err } if cnrID, err = f.createContainer(ctx, rootDirName); err != nil { delete(f.containerIDCache, rootDirName) return cid.ID{}, fmt.Errorf("createContainer: %w", err) } f.containerIDCache[rootDirName] = cnrID.String() return cnrID, nil } // Put the Object into the container func (f *Fs) Put(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) { rootDirName, containerPath := bucket.Split(filepath.Join(f.root, src.Remote())) cnrID, err := parseContainerID(rootDirName) if err != nil { if cnrID, err = f.resolveOrCreateContainer(ctx, rootDirName); err != nil { return nil, err } } ids, err := f.findObjectsFilePath(ctx, cnrID, containerPath) if err != nil { return nil, err } headers := fillHeaders(ctx, containerPath, src, options...) objHeader := formObject(f.owner, cnrID, filepath.Base(containerPath), headers) var prmPut pool.PrmObjectPut prmPut.SetHeader(*objHeader) prmPut.SetPayload(in) objID, err := f.pool.PutObject(ctx, prmPut) if err != nil { return nil, err } var prmHead pool.PrmObjectHead prmHead.SetAddress(newAddress(cnrID, objID.ObjectID)) obj, err := f.pool.HeadObject(ctx, prmHead) if err != nil { return nil, err } for _, id := range ids { var prmDelete pool.PrmObjectDelete prmDelete.SetAddress(newAddress(cnrID, id)) _ = f.pool.DeleteObject(ctx, prmDelete) } return newObject(f, obj, ""), nil } func fillHeaders(ctx context.Context, filePath string, src fs.ObjectInfo, options ...fs.OpenOption) map[string]string { if mimeTyper, ok := src.(fs.MimeTyper); ok { options = append(options, &fs.HTTPOption{ Key: object.AttributeContentType, Value: mimeTyper.MimeType(ctx), }) } headers := map[string]string{object.AttributeFilePath: filePath} for _, option := range options { key, value := option.Header() lowerKey := strings.ToLower(key) if len(value) == 0 { continue } switch lowerKey { case "": // ignore empty headers case "content-type": headers[object.AttributeContentType] = value case "timestamp": headers[object.AttributeTimestamp] = value default: if value != "" { headers[key] = value } } } if headers[object.AttributeTimestamp] == "" { headers[object.AttributeTimestamp] = strconv.FormatInt(src.ModTime(ctx).UTC().Unix(), 10) } return headers } // Update the Object from in with modTime and size func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) error { // When updating an object, the path to it should not change. src = robject.NewStaticObjectInfo(o.Remote(), src.ModTime(ctx), src.Size(), src.Storable(), nil, src.Fs()) rootDirName, containerPath := bucket.Split(filepath.Join(o.fs.root, src.Remote())) var cnrID cid.ID var err error if cnrID, err = o.fs.parseContainer(ctx, rootDirName); err != nil { return fmt.Errorf("parse container: %w", err) } headers := fillHeaders(ctx, containerPath, src, options...) objHeader := formObject(o.fs.owner, cnrID, filepath.Base(containerPath), headers) var prmPatch pool.PrmObjectPatch var rng object.Range rng.SetOffset(0) rng.SetLength(uint64(o.Size())) prmPatch.SetAddress(newAddress(cnrID, o.objectID())) prmPatch.SetNewAttributes(objHeader.Attributes()) prmPatch.SetRange(&rng) prmPatch.SetPayloadReader(in) prmPatch.SetReplaceAttributes(true) patchRes, err := o.fs.pool.PatchObject(ctx, prmPatch) if err != nil { return fmt.Errorf("patch object: %w", err) } // If the object IDs match, it means we have updated the object itself, // and the update process can be interrupted at this point. if patchRes.ObjectID.Equals(o.objectID()) { return nil } // Since the PatchObject method does not delete the source object, we need to delete it explicitly. if err = o.Remove(ctx); err != nil { fs.Logf(o, "couldn't remove old file after update '%s': %s", o.objectID(), err.Error()) } var prmHead pool.PrmObjectHead prmHead.SetAddress(newAddress(cnrID, patchRes.ObjectID)) obj, err := o.fs.pool.HeadObject(ctx, prmHead) if err != nil { return fmt.Errorf("fetch head object: %w", err) } objInfo := newObject(o.fs, obj, "") o.filePath = objInfo.filePath o.remote = objInfo.remote o.name = objInfo.name o.timestamp = objInfo.timestamp o.Object = &obj return nil } func (f *Fs) getContainerNameAndZone(containerStr string) (string, string) { return getContainerNameAndZone(containerStr, f.opt.DefaultContainerZone) } // Remove an object func (o *Object) Remove(ctx context.Context) error { cnrID, _ := o.ContainerID() var prm pool.PrmObjectDelete prm.SetAddress(newAddress(cnrID, o.objectID())) return o.fs.pool.DeleteObject(ctx, prm) } // NewObject finds the Object at remote. func (f *Fs) NewObject(ctx context.Context, remote string) (fs.Object, error) { containerStr, containerPath := bucket.Split(filepath.Join(f.root, remote)) cnrID, err := f.parseContainer(ctx, containerStr) if err != nil { return nil, fs.ErrorDirNotFound } ids, err := f.findObjectsFilePath(ctx, cnrID, containerPath) if err != nil { return nil, err } if len(ids) == 0 { return nil, fs.ErrorObjectNotFound } var prm pool.PrmObjectHead prm.SetAddress(newAddress(cnrID, ids[0])) obj, err := f.pool.HeadObject(ctx, prm) if err != nil { return nil, fmt.Errorf("head object: %w", err) } return newObject(f, obj, ""), nil } func (f *Fs) waitForAPECacheInvalidated(ctx context.Context, expectedCh chain.Chain, cnrID cid.ID) error { prmListAPEChains := pool.PrmListAPEChains{ Target: ape.ChainTarget{ TargetType: ape.TargetTypeContainer, Name: cnrID.EncodeToString(), }, } checkCtxDone := func(ctx context.Context) error { if _, ok := ctx.Deadline(); ok { return fmt.Errorf("waiting time for storing APE chain has expired") } return fmt.Errorf("waiting for the APE chain to be stored has been cancelled") } ctx, cancel := context.WithTimeout(ctx, time.Duration(f.opt.APECacheInvalidationTimeout)) defer cancel() for { chains, err := f.pool.ListAPEChains(ctx, prmListAPEChains) if err != nil { return fmt.Errorf("list APE chains: %w", err) } for _, rawChain := range chains { var ch chain.Chain err = ch.UnmarshalBinary(rawChain.Raw) if err != nil { return fmt.Errorf("unmarshal chain: %w", err) } if bytes.Equal(ch.ID, expectedCh.ID) { // At the moment, according to the core team, there is no way through the API // to check whether the APE chain stored in the contact has been applied to the container. // So after we make sure that the APE chain is stored, we just wait for a certain period of time // (8 seconds by default, the time until the next block and cache invalidation) select { case <-ctx.Done(): return checkCtxDone(ctx) case <-time.After(time.Duration(f.opt.APECacheInvalidationDuration)): return nil } } } select { case <-ctx.Done(): return checkCtxDone(ctx) case <-time.After(time.Duration(f.opt.APEChainCheckInterval)): } } } func (f *Fs) createContainer(ctx context.Context, rootDirName string) (cid.ID, error) { var policy netmap.PlacementPolicy if err := policy.DecodeString(f.opt.PlacementPolicy); err != nil { return cid.ID{}, fmt.Errorf("parse placement policy: %w", err) } var cnr container.Container cnr.Init() cnr.SetPlacementPolicy(policy) cnr.SetOwner(*f.owner) container.SetCreationTime(&cnr, time.Now()) container.SetName(&cnr, rootDirName) cnrName, cnrZone := f.getContainerNameAndZone(rootDirName) var domain container.Domain domain.SetZone(cnrZone) domain.SetName(cnrName) container.WriteDomain(&cnr, domain) if err := pool.SyncContainerWithNetwork(ctx, &cnr, f.pool); err != nil { return cid.ID{}, fmt.Errorf("sync container with network: %w", err) } prm := pool.PrmContainerPut{ ClientParams: sdkClient.PrmContainerPut{ Container: &cnr, }, } cnrID, err := f.pool.PutContainer(ctx, prm) if err != nil { return cnrID, fmt.Errorf("put container: %w", err) } ch := chain.Chain{ ID: chain.ID("rclone/" + f.owner.String()), Rules: f.opt.APERules, } data, err := ch.MarshalBinary() if err != nil { return cid.ID{}, fmt.Errorf("marshal chain: %w", err) } prmAddAPEChain := pool.PrmAddAPEChain{ Target: ape.ChainTarget{ TargetType: ape.TargetTypeContainer, Name: cnrID.EncodeToString(), }, Chain: ape.Chain{Raw: data}, } if err = f.pool.AddAPEChain(ctx, prmAddAPEChain); err != nil { return cid.ID{}, fmt.Errorf("add APE chain: %w", err) } if err = f.waitForAPECacheInvalidated(ctx, ch, cnrID); err != nil { return cid.ID{}, fmt.Errorf("wait for APE cache to be invalidated: %w", err) } return cnrID, nil } // Mkdir creates the container if it doesn't exist func (f *Fs) Mkdir(ctx context.Context, dir string) error { rootDirName, _ := bucket.Split(path.Join(f.root, dir)) if rootDirName == "" { return nil } _, err := parseContainerID(rootDirName) if err != nil { if _, err = f.resolveOrCreateContainer(ctx, rootDirName); err != nil { return err } } return nil } // Rmdir deletes the bucket if the fs is at the root func (f *Fs) Rmdir(ctx context.Context, dir string) error { rootDirName, containerPath := bucket.Split(path.Join(f.root, dir)) if rootDirName == "" || containerPath != "" { return nil } cnrID, err := f.parseContainer(ctx, rootDirName) if err != nil { return fs.ErrorDirNotFound } isEmpty, err := f.isContainerEmpty(ctx, cnrID) if err != nil { return err } if !isEmpty { return fs.ErrorDirectoryNotEmpty } prm := pool.PrmContainerDelete{ ContainerID: cnrID, } f.m.Lock() defer f.m.Unlock() if err = f.pool.DeleteContainer(ctx, prm); err != nil { return fmt.Errorf("couldn't delete container %s '%s': %w", cnrID, rootDirName, err) } delete(f.containerIDCache, rootDirName) return nil } // Purge deletes all the files and directories including the old versions. func (f *Fs) Purge(ctx context.Context, dir string) error { rootDirName, containerPath := bucket.Split(path.Join(f.root, dir)) cnrID, err := f.parseContainer(ctx, rootDirName) if err != nil { return nil } if err = f.deleteByPrefix(ctx, cnrID, containerPath); err != nil { return fmt.Errorf("delete by prefix: %w", err) } return nil } // setRoot changes the root of the Fs func (f *Fs) setRoot(root string) { f.root = strings.Trim(root, "/") f.rootContainer, f.rootDirectory = bucket.Split(f.root) } func parseContainerID(containerStr string) (cid.ID, error) { var cnrID cid.ID err := cnrID.DecodeString(containerStr) return cnrID, err } func getContainerIDByNameAndZone(dirEntry fs.DirEntry, cnrName, cnrZone, defaultZone string) (cnrID cid.ID, ok bool, err error) { actualName, actualZone := getContainerNameAndZone(dirEntry.Remote(), defaultZone) if cnrName != actualName || cnrZone != actualZone { return } var idEr fs.IDer if idEr, ok = dirEntry.(fs.IDer); ok { err = cnrID.DecodeString(idEr.ID()) } return } func resolveContainerIDWithNNS(resolver *resolver.NNS, cnrName, cnrZone string) (cid.ID, error) { var d container.Domain d.SetZone(cnrZone) d.SetName(cnrName) if cnrID, err := resolver.ResolveContainerDomain(d); err == nil { return cnrID, err } return cid.ID{}, fmt.Errorf("couldn't resolve container with name '%s' and zone '%s'", cnrName, cnrZone) } func (f *Fs) resolveCIDByRootDirName(ctx context.Context, rootDirName string) (cid.ID, error) { cnrName, cnrZone := f.getContainerNameAndZone(rootDirName) if cnrName == "" { return cid.ID{}, fmt.Errorf("couldn't resolve container '%s'", rootDirName) } if f.resolver != nil { return resolveContainerIDWithNNS(f.resolver, cnrName, cnrZone) } if dirEntries, err := f.listContainers(ctx); err == nil { for _, dirEntry := range dirEntries { if cnrID, ok, err := getContainerIDByNameAndZone(dirEntry, cnrName, cnrZone, f.opt.DefaultContainerZone); ok { return cnrID, err } } } return cid.ID{}, fmt.Errorf("couldn't resolve container '%s'", rootDirName) } func (f *Fs) resolveContainerIDHelper(ctx context.Context, rootDirName string) (cid.ID, error) { cnrIDStr, ok := f.containerIDCache[rootDirName] if ok { return parseContainerID(cnrIDStr) } cnrID, err := f.resolveCIDByRootDirName(ctx, rootDirName) if err != nil { return cid.ID{}, err } f.containerIDCache[rootDirName] = cnrID.String() return cnrID, nil } func (f *Fs) resolveContainerID(ctx context.Context, rootDirName string) (cid.ID, error) { f.m.Lock() defer f.m.Unlock() return f.resolveContainerIDHelper(ctx, rootDirName) } func (f *Fs) parseContainer(ctx context.Context, rootDirName string) (cid.ID, error) { cnrID, err := parseContainerID(rootDirName) if err != nil { return f.resolveContainerID(ctx, rootDirName) } return cnrID, nil } func (f *Fs) listEntries(ctx context.Context, rootDirName, containerPath, directory string, recursive bool) (fs.DirEntries, error) { cnrID, err := f.parseContainer(ctx, rootDirName) if err != nil { return nil, fs.ErrorDirNotFound } ids, err := f.findObjectsPrefix(ctx, cnrID, containerPath) if err != nil { return nil, err } res := make([]fs.DirEntry, 0, len(ids)) dirs := make(map[string]*fs.Dir) objs := make(map[string]*Object) for _, id := range ids { var prm pool.PrmObjectHead prm.SetAddress(newAddress(cnrID, id)) obj, err := f.pool.HeadObject(ctx, prm) if err != nil { return nil, err } objInf := newObject(f, obj, rootDirName) if !recursive { withoutPath := strings.TrimPrefix(objInf.filePath, containerPath) trimPrefixSlash := strings.TrimPrefix(withoutPath, "/") // emulate directories if index := strings.Index(trimPrefixSlash, "/"); index >= 0 { dir := fs.NewDir(filepath.Join(directory, trimPrefixSlash[:index]), time.Time{}) dir.SetID(filepath.Join(containerPath, dir.Remote())) dirs[dir.ID()] = dir continue } } if o, ok := objs[objInf.remote]; !ok || o.timestamp.Before(objInf.timestamp) { objs[objInf.remote] = objInf } } for _, dir := range dirs { res = append(res, dir) } for _, obj := range objs { res = append(res, obj) } return res, nil } func (f *Fs) listContainers(ctx context.Context) (fs.DirEntries, error) { prmList := pool.PrmContainerList{ OwnerID: *f.owner, } containers, err := f.pool.ListContainers(ctx, prmList) if err != nil { return nil, err } res := make([]fs.DirEntry, len(containers)) for i, containerID := range containers { prm := pool.PrmContainerGet{ ContainerID: containerID, } cnr, err := f.pool.GetContainer(ctx, prm) if err != nil { return nil, fmt.Errorf("couldn't get container '%s': %w", containerID, err) } res[i] = newDir(containerID, cnr, f.opt.DefaultContainerZone) } return res, nil } func (f *Fs) findObjectsFilePath(ctx context.Context, cnrID cid.ID, filePath string) ([]oid.ID, error) { return f.findObjects(ctx, cnrID, searchFilter{ Header: object.AttributeFilePath, Value: filePath, MatchType: object.MatchStringEqual, }) } func (f *Fs) findObjectsPrefix(ctx context.Context, cnrID cid.ID, prefix string) ([]oid.ID, error) { return f.findObjects(ctx, cnrID, searchFilter{ Header: object.AttributeFilePath, Value: prefix, MatchType: object.MatchCommonPrefix, }) } func (f *Fs) findObjects(ctx context.Context, cnrID cid.ID, filters ...searchFilter) ([]oid.ID, error) { sf := object.NewSearchFilters() sf.AddRootFilter() for _, filter := range filters { sf.AddFilter(filter.Header, filter.Value, filter.MatchType) } return f.searchObjects(ctx, cnrID, sf) } func (f *Fs) deleteByPrefix(ctx context.Context, cnrID cid.ID, prefix string) error { filters := object.NewSearchFilters() filters.AddRootFilter() filters.AddFilter(object.AttributeFilePath, prefix, object.MatchCommonPrefix) var prmSearch pool.PrmObjectSearch prmSearch.SetContainerID(cnrID) prmSearch.SetFilters(filters) res, err := f.pool.SearchObjects(ctx, prmSearch) if err != nil { return fmt.Errorf("init searching using client: %w", err) } defer res.Close() var ( inErr error found bool prmDelete pool.PrmObjectDelete addr oid.Address ) addr.SetContainer(cnrID) err = res.Iterate(func(id oid.ID) bool { found = true addr.SetObject(id) prmDelete.SetAddress(addr) if err = f.pool.DeleteObject(ctx, prmDelete); err != nil { inErr = fmt.Errorf("delete object: %w", err) return true } return false }) if err == nil { err = inErr } if err != nil { return fmt.Errorf("iterate objects: %w", err) } if !found { return fs.ErrorDirNotFound } return nil } func (f *Fs) isContainerEmpty(ctx context.Context, cnrID cid.ID) (bool, error) { filters := object.NewSearchFilters() filters.AddRootFilter() var prm pool.PrmObjectSearch prm.SetContainerID(cnrID) prm.SetFilters(filters) res, err := f.pool.SearchObjects(ctx, prm) if err != nil { return false, fmt.Errorf("init searching using client: %w", err) } defer res.Close() isEmpty := true err = res.Iterate(func(id oid.ID) bool { isEmpty = false return true }) if err != nil { return false, fmt.Errorf("iterate objects: %w", err) } return isEmpty, nil } func (f *Fs) searchObjects(ctx context.Context, cnrID cid.ID, filters object.SearchFilters) ([]oid.ID, error) { var prm pool.PrmObjectSearch prm.SetContainerID(cnrID) prm.SetFilters(filters) res, err := f.pool.SearchObjects(ctx, prm) if err != nil { return nil, fmt.Errorf("init searching using client: %w", err) } defer res.Close() var buf []oid.ID err = res.Iterate(func(id oid.ID) bool { buf = append(buf, id) return false }) if err != nil { return nil, fmt.Errorf("iterate objects: %w", err) } return buf, nil } // Check the interfaces are satisfied var ( _ fs.Fs = &Fs{} _ fs.ListRer = &Fs{} _ fs.Purger = &Fs{} _ fs.PutStreamer = &Fs{} _ fs.Shutdowner = &Fs{} _ fs.Object = &Object{} _ fs.MimeTyper = &Object{} )