rclone/backend/frostfs/frostfs.go
Aleksey Kravchenko be95531bfd [#5] Add container zone names support.
Signed-off-by: Aleksey Kravchenko <al.kravchenko@yadro.com>
2025-01-27 16:33:41 +03:00

1254 lines
33 KiB
Go

// Package frostfs provides an interface to FrostFS decentralized distributed object storage system
package frostfs
import (
"bytes"
"context"
"encoding/hex"
"fmt"
"io"
"math"
"path"
"path/filepath"
"strconv"
"strings"
"sync"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/ape"
sdkClient "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
resolver "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/ns"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
"git.frostfs.info/TrueCloudLab/policy-engine/pkg/chain"
"github.com/rclone/rclone/fs"
"github.com/rclone/rclone/fs/config/configmap"
"github.com/rclone/rclone/fs/config/configstruct"
"github.com/rclone/rclone/fs/hash"
robject "github.com/rclone/rclone/fs/object"
"github.com/rclone/rclone/fs/walk"
"github.com/rclone/rclone/lib/bucket"
)
func init() {
fs.Register(&fs.RegInfo{
Name: "frostfs",
Description: "Distributed, decentralized object storage FrostFS",
NewFs: NewFs,
Options: []fs.Option{
{
Name: "endpoint",
Help: "Endpoints to connect to FrostFS node",
Required: true,
Examples: []fs.OptionExample{
{
Value: "s01.frostfs.devenv:8080",
Help: "One endpoint.",
},
{
Value: "s01.frostfs.devenv:8080 s02.frostfs.devenv:8080",
Help: "Multiple endpoints to form pool.",
},
{
Value: "s01.frostfs.devenv:8080,1 s02.frostfs.devenv:8080,2",
Help: "Multiple endpoints with priority (lower value has higher priority). Until s01 is healthy, all requests will be sent to it.",
},
{
Value: "s01.frostfs.devenv:8080,1,1 s02.frostfs.devenv:8080,2,1 s03.frostfs.devenv:8080,2,9",
Help: "Multiple endpoints with priority and weights. After the s01 endpoint is unhealthy, requests will be sent to the s02 and s03 endpoints in proportions of 10% and 90%, respectively.",
},
},
},
{
Name: "connection_timeout",
Default: 4 * time.Second,
Help: "FrostFS connection timeout",
},
{
Name: "request_timeout",
Default: 12 * time.Second,
Help: "FrostFS request timeout",
},
{
Name: "rebalance_interval",
Default: 15 * time.Second,
Help: "FrostFS rebalance connections interval",
},
{
Name: "session_expiration",
Default: math.MaxUint32,
Help: "FrostFS session expiration epoch",
},
{
Name: "ape_cache_invalidation_duration",
Default: 8 * time.Second,
Help: "APE cache invalidation duration",
},
{
Name: "ape_cache_invalidation_timeout",
Default: 24 * time.Second,
Help: "APE cache invalidation timeout",
},
{
Name: "ape_chain_check_interval",
Default: 500 * time.Millisecond,
Help: "The interval for verifying that the APE chain is saved in FrostFS.",
},
{
Name: "rpc_endpoint",
Help: "Endpoint to connect to Neo rpc node",
},
{
Name: "wallet",
Help: "Path to wallet",
Required: true,
},
{
Name: "address",
Help: "Address of account",
},
{
Name: "password",
Help: "Password to decrypt wallet",
},
{
Name: "placement_policy",
Default: "REP 3",
Help: "Placement policy for new containers",
Examples: []fs.OptionExample{
{
Value: "REP 3",
Help: "Container will have 3 replicas",
},
},
},
{
Name: "default_container_zone",
Default: "container",
Help: "The name of the zone in which containers will be created or resolved if the zone name is not explicitly specified with the container name.",
},
{
Name: "container_creation_policy",
Default: "private",
Help: "Container creation policy for new containers",
Examples: []fs.OptionExample{
{
Value: "public-read-write",
Help: "Public container, anyone can read and write",
},
{
Value: "public-read",
Help: "Public container, owner can read and write, others can only read",
},
{
Value: "private",
Help: "Private container, only owner has access to it",
},
},
},
},
})
}
// Options defines the configuration for this backend
type Options struct {
FrostfsEndpoint string `config:"endpoint"`
FrostfsConnectionTimeout fs.Duration `config:"connection_timeout"`
FrostfsRequestTimeout fs.Duration `config:"request_timeout"`
FrostfsRebalanceInterval fs.Duration `config:"rebalance_interval"`
FrostfsSessionExpiration uint64 `config:"session_expiration"`
APECacheInvalidationDuration fs.Duration `config:"ape_cache_invalidation_duration"`
APECacheInvalidationTimeout fs.Duration `config:"ape_cache_invalidation_timeout"`
APEChainCheckInterval fs.Duration `config:"ape_chain_check_interval"`
RPCEndpoint string `config:"rpc_endpoint"`
Wallet string `config:"wallet"`
Address string `config:"address"`
Password string `config:"password"`
PlacementPolicy string `config:"placement_policy"`
DefaultContainerZone string `config:"default_container_zone"`
ContainerCreationPolicy string `config:"container_creation_policy"`
APERules []chain.Rule `config:"-"`
}
// Fs represents a remote frostfs server
type Fs struct {
name string // the name of the remote
root string // root of the bucket - ignore all objects above this
opt *Options
ctx context.Context
pool *pool.Pool
owner *user.ID
rootContainer string
rootDirectory string
features *fs.Features
resolver *resolver.NNS
m sync.Mutex
containerIDCache map[string]string
}
type searchFilter struct {
Header string
Value string
MatchType object.SearchMatchType
}
// Object describes a frostfs object
type Object struct {
*object.Object
fs *Fs
name string
remote string
filePath string
contentType string
timestamp time.Time
}
// Shutdown the backend, closing any background tasks and any
// cached connections.
func (f *Fs) Shutdown(_ context.Context) error {
f.pool.Close()
return nil
}
// PutStream uploads to the remote path with the modTime given of indeterminate size
func (f *Fs) PutStream(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) {
return f.Put(ctx, in, src, options...)
}
// NewFs creates a new Fs object from the name and root. It connects to
// the host specified in the config file.
func NewFs(ctx context.Context, name string, root string, m configmap.Mapper) (fs.Fs, error) {
opt := new(Options)
err := configstruct.Set(m, opt)
if err != nil {
return nil, err
}
opt.APERules, err = parseContainerCreationPolicyString(opt.ContainerCreationPolicy)
if err != nil {
return nil, fmt.Errorf("couldn't parse container creation policy: %w", err)
}
acc, err := getAccount(opt)
if err != nil {
return nil, err
}
p, err := createPool(ctx, acc.PrivateKey(), opt)
if err != nil {
return nil, err
}
nnsResolver, err := createNNSResolver(opt)
if err != nil {
return nil, err
}
var owner user.ID
user.IDFromKey(&owner, acc.PrivateKey().PrivateKey.PublicKey)
f := &Fs{
name: name,
opt: opt,
ctx: ctx,
pool: p,
owner: &owner,
resolver: nnsResolver,
containerIDCache: make(map[string]string),
}
f.setRoot(root)
f.features = (&fs.Features{
DuplicateFiles: true,
ReadMimeType: true,
WriteMimeType: true,
BucketBased: true,
BucketBasedRootOK: true,
}).Fill(ctx, f)
if f.rootContainer != "" && f.rootDirectory != "" && !strings.HasSuffix(root, "/") {
// Check to see if the (container,directory) is actually an existing file
oldRoot := f.root
newRoot, leaf := path.Split(oldRoot)
f.setRoot(newRoot)
_, err := f.NewObject(ctx, leaf)
if err != nil {
// File doesn't exist or is a directory so return old f
f.setRoot(oldRoot)
return f, nil
}
// return an error with a fs which points to the parent
return f, fs.ErrorIsFile
}
return f, nil
}
func newObject(f *Fs, obj object.Object, container string) *Object {
// we should not include rootDirectory into remote name
prefix := f.rootDirectory
if prefix != "" {
prefix += "/"
}
objID, _ := obj.ID()
objInfo := &Object{
Object: &obj,
fs: f,
name: objID.EncodeToString(),
}
for _, attr := range obj.Attributes() {
if attr.Key() == object.AttributeFileName {
objInfo.name = attr.Value()
}
if attr.Key() == object.AttributeFilePath {
objInfo.filePath = attr.Value()
}
if attr.Key() == object.AttributeContentType {
objInfo.contentType = attr.Value()
}
if attr.Key() == object.AttributeTimestamp {
value, err := strconv.ParseInt(attr.Value(), 10, 64)
if err != nil {
continue
}
objInfo.timestamp = time.Unix(value, 0)
}
}
if objInfo.filePath == "" {
objInfo.filePath = objInfo.name
}
objInfo.remote = objInfo.filePath
if strings.Contains(objInfo.remote, prefix) {
objInfo.remote = objInfo.remote[len(prefix):]
if container != "" && f.rootContainer == "" {
// we should add container name to remote name if root is empty
objInfo.remote = path.Join(container, objInfo.remote)
}
}
return objInfo
}
// MimeType of an Object if known, "" otherwise
func (o *Object) MimeType(_ context.Context) string {
return o.contentType
}
// String implements the Stringer interface
func (o *Object) String() string {
if o == nil {
return "<nil>"
}
return o.filePath
}
// ID returns the ID of the Object if known, or "" if not
func (o *Object) ID() string {
return o.objectID().EncodeToString()
}
func (o *Object) objectID() oid.ID {
objID, _ := o.Object.ID()
return objID
}
// Remote returns the remote path
func (o *Object) Remote() string {
return o.remote
}
// ModTime returns the modification time of the object
func (o *Object) ModTime(_ context.Context) time.Time {
return o.timestamp
}
// Size returns the size of an object in bytes
func (o *Object) Size() int64 {
return int64(o.PayloadSize())
}
// Fs returns the parent Fs
func (o *Object) Fs() fs.Info {
return o.fs
}
// Hash returns the SHA-256 hash of an object returning a lowercase hex string
func (o *Object) Hash(_ context.Context, ty hash.Type) (string, error) {
if ty != hash.SHA256 {
return "", hash.ErrUnsupported
}
payloadCheckSum, _ := o.PayloadChecksum()
return hex.EncodeToString(payloadCheckSum.Value()), nil
}
// Storable says whether this object can be stored
func (o *Object) Storable() bool {
return true
}
// SetModTime sets the modification time of the local fs object
func (o *Object) SetModTime(context.Context, time.Time) error {
return fs.ErrorCantSetModTime
}
// BuffCloser is wrapper to load files from frostfs.
type BuffCloser struct {
io.Reader
}
// Close implements the Closer interface
func (bc *BuffCloser) Close() error {
return nil
}
// Open an object for read
func (o *Object) Open(ctx context.Context, options ...fs.OpenOption) (io.ReadCloser, error) {
var isRange bool
offset, length := uint64(0), o.PayloadSize()
fs.FixRangeOption(options, int64(o.PayloadSize()))
for _, option := range options {
switch option := option.(type) {
case *fs.RangeOption:
isRange = true
offset = uint64(option.Start)
if option.End < 0 {
option.End = int64(o.PayloadSize()) + option.End
}
length = uint64(option.End - option.Start + 1)
case *fs.SeekOption:
isRange = true
offset = uint64(option.Offset)
length = o.PayloadSize() - uint64(option.Offset)
default:
if option.Mandatory() {
fs.Logf(o, "Unsupported mandatory option: %v", option)
}
}
}
cnrID, _ := o.ContainerID()
addr := newAddress(cnrID, o.objectID())
if isRange {
var prm pool.PrmObjectRange
prm.SetAddress(addr)
prm.SetOffset(offset)
prm.SetLength(length)
rangeReader, err := o.fs.pool.ObjectRange(ctx, prm)
if err != nil {
return nil, err
}
return &rangeReader, nil
}
var prm pool.PrmObjectGet
prm.SetAddress(addr)
// we cannot use ObjectRange in this case because it panics if zero length range is requested
res, err := o.fs.pool.GetObject(ctx, prm)
if err != nil {
return nil, fmt.Errorf("couldn't get object %s: %w", addr, err)
}
return res.Payload, nil
}
// Name of the remote (as passed into NewFs)
func (f *Fs) Name() string {
return f.name
}
// Root of the remote (as passed into NewFs)
func (f *Fs) Root() string {
return f.root
}
// String converts this Fs to a string
func (f *Fs) String() string {
if f.rootContainer == "" {
return "FrostFS root"
}
if f.rootDirectory == "" {
return fmt.Sprintf("FrostFS container %s", f.rootContainer)
}
return fmt.Sprintf("FrostFS container %s path %s", f.rootContainer, f.rootDirectory)
}
// Precision of the remote
func (f *Fs) Precision() time.Duration {
return time.Second
}
// Hashes returns the supported hash sets.
func (f *Fs) Hashes() hash.Set {
return hash.Set(hash.SHA256)
}
// Features returns the optional features of this Fs
func (f *Fs) Features() *fs.Features {
return f.features
}
// List the objects and directories in dir into entries.
func (f *Fs) List(ctx context.Context, dir string) (fs.DirEntries, error) {
rootDirName, containerPath := bucket.Split(path.Join(f.root, dir))
if rootDirName == "" {
if containerPath != "" {
return nil, fs.ErrorListBucketRequired
}
return f.listContainers(ctx)
}
return f.listEntries(ctx, rootDirName, containerPath, dir, false)
}
// ListR lists the objects and directories of the Fs starting
// from dir recursively into out.
func (f *Fs) ListR(ctx context.Context, dir string, callback fs.ListRCallback) error {
rootDirName, containerPath := bucket.Split(path.Join(f.root, dir))
list := walk.NewListRHelper(callback)
if rootDirName == "" {
if containerPath != "" {
return fs.ErrorListBucketRequired
}
containers, err := f.listContainers(ctx)
if err != nil {
return err
}
for _, containerDir := range containers {
if err = f.listR(ctx, list, containerDir.Remote(), containerPath, dir); err != nil {
return err
}
}
return list.Flush()
}
if err := f.listR(ctx, list, rootDirName, containerPath, dir); err != nil {
return err
}
return list.Flush()
}
func (f *Fs) listR(ctx context.Context, list *walk.ListRHelper, rootDirName, containerPath, dir string) error {
entries, err := f.listEntries(ctx, rootDirName, containerPath, dir, true)
if err != nil {
return err
}
for _, entry := range entries {
if err = list.Add(entry); err != nil {
return err
}
}
return nil
}
func (f *Fs) resolveOrCreateContainer(ctx context.Context, rootDirName string) (cid.ID, error) {
// Due to the fact that this method is called when performing "put" operations,
// which can be run in parallel in several goroutines,
// we need to use a global lock here so that if a requested container is missing,
// multiple goroutines do not attempt to create a container with the same name simultaneously,
// which could cause unexpected behavior.
f.m.Lock()
defer f.m.Unlock()
cnrID, err := f.resolveContainerIDHelper(ctx, rootDirName)
if err == nil {
return cnrID, err
}
if cnrID, err = f.createContainer(ctx, rootDirName); err != nil {
delete(f.containerIDCache, rootDirName)
return cid.ID{}, fmt.Errorf("createContainer: %w", err)
}
f.containerIDCache[rootDirName] = cnrID.String()
return cnrID, nil
}
// Put the Object into the container
func (f *Fs) Put(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) {
rootDirName, containerPath := bucket.Split(filepath.Join(f.root, src.Remote()))
cnrID, err := parseContainerID(rootDirName)
if err != nil {
if cnrID, err = f.resolveOrCreateContainer(ctx, rootDirName); err != nil {
return nil, err
}
}
ids, err := f.findObjectsFilePath(ctx, cnrID, containerPath)
if err != nil {
return nil, err
}
headers := fillHeaders(ctx, containerPath, src, options...)
objHeader := formObject(f.owner, cnrID, filepath.Base(containerPath), headers)
var prmPut pool.PrmObjectPut
prmPut.SetHeader(*objHeader)
prmPut.SetPayload(in)
objID, err := f.pool.PutObject(ctx, prmPut)
if err != nil {
return nil, err
}
var prmHead pool.PrmObjectHead
prmHead.SetAddress(newAddress(cnrID, objID.ObjectID))
obj, err := f.pool.HeadObject(ctx, prmHead)
if err != nil {
return nil, err
}
for _, id := range ids {
var prmDelete pool.PrmObjectDelete
prmDelete.SetAddress(newAddress(cnrID, id))
_ = f.pool.DeleteObject(ctx, prmDelete)
}
return newObject(f, obj, ""), nil
}
func fillHeaders(ctx context.Context, filePath string, src fs.ObjectInfo, options ...fs.OpenOption) map[string]string {
if mimeTyper, ok := src.(fs.MimeTyper); ok {
options = append(options, &fs.HTTPOption{
Key: object.AttributeContentType,
Value: mimeTyper.MimeType(ctx),
})
}
headers := map[string]string{object.AttributeFilePath: filePath}
for _, option := range options {
key, value := option.Header()
lowerKey := strings.ToLower(key)
if len(value) == 0 {
continue
}
switch lowerKey {
case "":
// ignore empty headers
case "content-type":
headers[object.AttributeContentType] = value
case "timestamp":
headers[object.AttributeTimestamp] = value
default:
if value != "" {
headers[key] = value
}
}
}
if headers[object.AttributeTimestamp] == "" {
headers[object.AttributeTimestamp] = strconv.FormatInt(src.ModTime(ctx).UTC().Unix(), 10)
}
return headers
}
// Update the Object from in with modTime and size
func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) error {
// When updating an object, the path to it should not change.
src = robject.NewStaticObjectInfo(o.Remote(), src.ModTime(ctx), src.Size(), src.Storable(), nil, src.Fs())
rootDirName, containerPath := bucket.Split(filepath.Join(o.fs.root, src.Remote()))
var cnrID cid.ID
var err error
if cnrID, err = o.fs.parseContainer(ctx, rootDirName); err != nil {
return fmt.Errorf("parse container: %w", err)
}
headers := fillHeaders(ctx, containerPath, src, options...)
objHeader := formObject(o.fs.owner, cnrID, filepath.Base(containerPath), headers)
var prmPatch pool.PrmObjectPatch
var rng object.Range
rng.SetOffset(0)
rng.SetLength(uint64(o.Size()))
prmPatch.SetAddress(newAddress(cnrID, o.objectID()))
prmPatch.SetNewAttributes(objHeader.Attributes())
prmPatch.SetRange(&rng)
prmPatch.SetPayloadReader(in)
prmPatch.SetReplaceAttributes(true)
patchRes, err := o.fs.pool.PatchObject(ctx, prmPatch)
if err != nil {
return fmt.Errorf("patch object: %w", err)
}
// If the object IDs match, it means we have updated the object itself,
// and the update process can be interrupted at this point.
if patchRes.ObjectID.Equals(o.objectID()) {
return nil
}
// Since the PatchObject method does not delete the source object, we need to delete it explicitly.
if err = o.Remove(ctx); err != nil {
fs.Logf(o, "couldn't remove old file after update '%s': %s", o.objectID(), err.Error())
}
var prmHead pool.PrmObjectHead
prmHead.SetAddress(newAddress(cnrID, patchRes.ObjectID))
obj, err := o.fs.pool.HeadObject(ctx, prmHead)
if err != nil {
return fmt.Errorf("fetch head object: %w", err)
}
objInfo := newObject(o.fs, obj, "")
o.filePath = objInfo.filePath
o.remote = objInfo.remote
o.name = objInfo.name
o.timestamp = objInfo.timestamp
o.Object = &obj
return nil
}
func (f *Fs) getContainerNameAndZone(containerStr string) (string, string) {
return getContainerNameAndZone(containerStr, f.opt.DefaultContainerZone)
}
// Remove an object
func (o *Object) Remove(ctx context.Context) error {
cnrID, _ := o.ContainerID()
var prm pool.PrmObjectDelete
prm.SetAddress(newAddress(cnrID, o.objectID()))
return o.fs.pool.DeleteObject(ctx, prm)
}
// NewObject finds the Object at remote.
func (f *Fs) NewObject(ctx context.Context, remote string) (fs.Object, error) {
containerStr, containerPath := bucket.Split(filepath.Join(f.root, remote))
cnrID, err := f.parseContainer(ctx, containerStr)
if err != nil {
return nil, fs.ErrorDirNotFound
}
ids, err := f.findObjectsFilePath(ctx, cnrID, containerPath)
if err != nil {
return nil, err
}
if len(ids) == 0 {
return nil, fs.ErrorObjectNotFound
}
var prm pool.PrmObjectHead
prm.SetAddress(newAddress(cnrID, ids[0]))
obj, err := f.pool.HeadObject(ctx, prm)
if err != nil {
return nil, fmt.Errorf("head object: %w", err)
}
return newObject(f, obj, ""), nil
}
func (f *Fs) waitForAPECacheInvalidated(ctx context.Context, expectedCh chain.Chain, cnrID cid.ID) error {
prmListAPEChains := pool.PrmListAPEChains{
Target: ape.ChainTarget{
TargetType: ape.TargetTypeContainer,
Name: cnrID.EncodeToString(),
},
}
checkCtxDone := func(ctx context.Context) error {
if _, ok := ctx.Deadline(); ok {
return fmt.Errorf("waiting time for storing APE chain has expired")
}
return fmt.Errorf("waiting for the APE chain to be stored has been cancelled")
}
ctx, cancel := context.WithTimeout(ctx, time.Duration(f.opt.APECacheInvalidationTimeout))
defer cancel()
for {
chains, err := f.pool.ListAPEChains(ctx, prmListAPEChains)
if err != nil {
return fmt.Errorf("list APE chains: %w", err)
}
for _, rawChain := range chains {
var ch chain.Chain
err = ch.UnmarshalBinary(rawChain.Raw)
if err != nil {
return fmt.Errorf("unmarshal chain: %w", err)
}
if bytes.Equal(ch.ID, expectedCh.ID) {
// At the moment, according to the core team, there is no way through the API
// to check whether the APE chain stored in the contact has been applied to the container.
// So after we make sure that the APE chain is stored, we just wait for a certain period of time
// (8 seconds by default, the time until the next block and cache invalidation)
select {
case <-ctx.Done():
return checkCtxDone(ctx)
case <-time.After(time.Duration(f.opt.APECacheInvalidationDuration)):
return nil
}
}
}
select {
case <-ctx.Done():
return checkCtxDone(ctx)
case <-time.After(time.Duration(f.opt.APEChainCheckInterval)):
}
}
}
func (f *Fs) createContainer(ctx context.Context, rootDirName string) (cid.ID, error) {
var policy netmap.PlacementPolicy
if err := policy.DecodeString(f.opt.PlacementPolicy); err != nil {
return cid.ID{}, fmt.Errorf("parse placement policy: %w", err)
}
var cnr container.Container
cnr.Init()
cnr.SetPlacementPolicy(policy)
cnr.SetOwner(*f.owner)
container.SetCreationTime(&cnr, time.Now())
container.SetName(&cnr, rootDirName)
cnrName, cnrZone := f.getContainerNameAndZone(rootDirName)
var domain container.Domain
domain.SetZone(cnrZone)
domain.SetName(cnrName)
container.WriteDomain(&cnr, domain)
if err := pool.SyncContainerWithNetwork(ctx, &cnr, f.pool); err != nil {
return cid.ID{}, fmt.Errorf("sync container with network: %w", err)
}
prm := pool.PrmContainerPut{
ClientParams: sdkClient.PrmContainerPut{
Container: &cnr,
},
}
cnrID, err := f.pool.PutContainer(ctx, prm)
if err != nil {
return cnrID, fmt.Errorf("put container: %w", err)
}
ch := chain.Chain{
ID: chain.ID("rclone/" + f.owner.String()),
Rules: f.opt.APERules,
}
data, err := ch.MarshalBinary()
if err != nil {
return cid.ID{}, fmt.Errorf("marshal chain: %w", err)
}
prmAddAPEChain := pool.PrmAddAPEChain{
Target: ape.ChainTarget{
TargetType: ape.TargetTypeContainer,
Name: cnrID.EncodeToString(),
},
Chain: ape.Chain{Raw: data},
}
if err = f.pool.AddAPEChain(ctx, prmAddAPEChain); err != nil {
return cid.ID{}, fmt.Errorf("add APE chain: %w", err)
}
if err = f.waitForAPECacheInvalidated(ctx, ch, cnrID); err != nil {
return cid.ID{}, fmt.Errorf("wait for APE cache to be invalidated: %w", err)
}
return cnrID, nil
}
// Mkdir creates the container if it doesn't exist
func (f *Fs) Mkdir(ctx context.Context, dir string) error {
rootDirName, _ := bucket.Split(path.Join(f.root, dir))
if rootDirName == "" {
return nil
}
_, err := parseContainerID(rootDirName)
if err != nil {
if _, err = f.resolveOrCreateContainer(ctx, rootDirName); err != nil {
return err
}
}
return nil
}
// Rmdir deletes the bucket if the fs is at the root
func (f *Fs) Rmdir(ctx context.Context, dir string) error {
rootDirName, containerPath := bucket.Split(path.Join(f.root, dir))
if rootDirName == "" || containerPath != "" {
return nil
}
cnrID, err := f.parseContainer(ctx, rootDirName)
if err != nil {
return fs.ErrorDirNotFound
}
isEmpty, err := f.isContainerEmpty(ctx, cnrID)
if err != nil {
return err
}
if !isEmpty {
return fs.ErrorDirectoryNotEmpty
}
prm := pool.PrmContainerDelete{
ContainerID: cnrID,
}
f.m.Lock()
defer f.m.Unlock()
if err = f.pool.DeleteContainer(ctx, prm); err != nil {
return fmt.Errorf("couldn't delete container %s '%s': %w", cnrID, rootDirName, err)
}
delete(f.containerIDCache, rootDirName)
return nil
}
// Purge deletes all the files and directories including the old versions.
func (f *Fs) Purge(ctx context.Context, dir string) error {
rootDirName, containerPath := bucket.Split(path.Join(f.root, dir))
cnrID, err := f.parseContainer(ctx, rootDirName)
if err != nil {
return nil
}
if err = f.deleteByPrefix(ctx, cnrID, containerPath); err != nil {
return fmt.Errorf("delete by prefix: %w", err)
}
return nil
}
// setRoot changes the root of the Fs
func (f *Fs) setRoot(root string) {
f.root = strings.Trim(root, "/")
f.rootContainer, f.rootDirectory = bucket.Split(f.root)
}
func parseContainerID(containerStr string) (cid.ID, error) {
var cnrID cid.ID
err := cnrID.DecodeString(containerStr)
return cnrID, err
}
func getContainerIDByNameAndZone(dirEntry fs.DirEntry, cnrName, cnrZone, defaultZone string) (cnrID cid.ID, ok bool, err error) {
actualName, actualZone := getContainerNameAndZone(dirEntry.Remote(), defaultZone)
if cnrName != actualName || cnrZone != actualZone {
return
}
var idEr fs.IDer
if idEr, ok = dirEntry.(fs.IDer); ok {
err = cnrID.DecodeString(idEr.ID())
}
return
}
func resolveContainerIDWithNNS(resolver *resolver.NNS, cnrName, cnrZone string) (cid.ID, error) {
var d container.Domain
d.SetZone(cnrZone)
d.SetName(cnrName)
if cnrID, err := resolver.ResolveContainerDomain(d); err == nil {
return cnrID, err
}
return cid.ID{}, fmt.Errorf("couldn't resolve container with name '%s' and zone '%s'", cnrName, cnrZone)
}
func (f *Fs) resolveCIDByRootDirName(ctx context.Context, rootDirName string) (cid.ID, error) {
cnrName, cnrZone := f.getContainerNameAndZone(rootDirName)
if cnrName == "" {
return cid.ID{}, fmt.Errorf("couldn't resolve container '%s'", rootDirName)
}
if f.resolver != nil {
return resolveContainerIDWithNNS(f.resolver, cnrName, cnrZone)
}
if dirEntries, err := f.listContainers(ctx); err == nil {
for _, dirEntry := range dirEntries {
if cnrID, ok, err := getContainerIDByNameAndZone(dirEntry, cnrName, cnrZone, f.opt.DefaultContainerZone); ok {
return cnrID, err
}
}
}
return cid.ID{}, fmt.Errorf("couldn't resolve container '%s'", rootDirName)
}
func (f *Fs) resolveContainerIDHelper(ctx context.Context, rootDirName string) (cid.ID, error) {
cnrIDStr, ok := f.containerIDCache[rootDirName]
if ok {
return parseContainerID(cnrIDStr)
}
cnrID, err := f.resolveCIDByRootDirName(ctx, rootDirName)
if err != nil {
return cid.ID{}, err
}
f.containerIDCache[rootDirName] = cnrID.String()
return cnrID, nil
}
func (f *Fs) resolveContainerID(ctx context.Context, rootDirName string) (cid.ID, error) {
f.m.Lock()
defer f.m.Unlock()
return f.resolveContainerIDHelper(ctx, rootDirName)
}
func (f *Fs) parseContainer(ctx context.Context, rootDirName string) (cid.ID, error) {
cnrID, err := parseContainerID(rootDirName)
if err != nil {
return f.resolveContainerID(ctx, rootDirName)
}
return cnrID, nil
}
func (f *Fs) listEntries(ctx context.Context, rootDirName, containerPath, directory string, recursive bool) (fs.DirEntries, error) {
cnrID, err := f.parseContainer(ctx, rootDirName)
if err != nil {
return nil, fs.ErrorDirNotFound
}
ids, err := f.findObjectsPrefix(ctx, cnrID, containerPath)
if err != nil {
return nil, err
}
res := make([]fs.DirEntry, 0, len(ids))
dirs := make(map[string]*fs.Dir)
objs := make(map[string]*Object)
for _, id := range ids {
var prm pool.PrmObjectHead
prm.SetAddress(newAddress(cnrID, id))
obj, err := f.pool.HeadObject(ctx, prm)
if err != nil {
return nil, err
}
objInf := newObject(f, obj, rootDirName)
if !recursive {
withoutPath := strings.TrimPrefix(objInf.filePath, containerPath)
trimPrefixSlash := strings.TrimPrefix(withoutPath, "/")
// emulate directories
if index := strings.Index(trimPrefixSlash, "/"); index >= 0 {
dir := fs.NewDir(filepath.Join(directory, trimPrefixSlash[:index]), time.Time{})
dir.SetID(filepath.Join(containerPath, dir.Remote()))
dirs[dir.ID()] = dir
continue
}
}
if o, ok := objs[objInf.remote]; !ok || o.timestamp.Before(objInf.timestamp) {
objs[objInf.remote] = objInf
}
}
for _, dir := range dirs {
res = append(res, dir)
}
for _, obj := range objs {
res = append(res, obj)
}
return res, nil
}
func (f *Fs) listContainers(ctx context.Context) (fs.DirEntries, error) {
prmList := pool.PrmContainerList{
OwnerID: *f.owner,
}
containers, err := f.pool.ListContainers(ctx, prmList)
if err != nil {
return nil, err
}
res := make([]fs.DirEntry, len(containers))
for i, containerID := range containers {
prm := pool.PrmContainerGet{
ContainerID: containerID,
}
cnr, err := f.pool.GetContainer(ctx, prm)
if err != nil {
return nil, fmt.Errorf("couldn't get container '%s': %w", containerID, err)
}
res[i] = newDir(containerID, cnr, f.opt.DefaultContainerZone)
}
return res, nil
}
func (f *Fs) findObjectsFilePath(ctx context.Context, cnrID cid.ID, filePath string) ([]oid.ID, error) {
return f.findObjects(ctx, cnrID, searchFilter{
Header: object.AttributeFilePath,
Value: filePath,
MatchType: object.MatchStringEqual,
})
}
func (f *Fs) findObjectsPrefix(ctx context.Context, cnrID cid.ID, prefix string) ([]oid.ID, error) {
return f.findObjects(ctx, cnrID, searchFilter{
Header: object.AttributeFilePath,
Value: prefix,
MatchType: object.MatchCommonPrefix,
})
}
func (f *Fs) findObjects(ctx context.Context, cnrID cid.ID, filters ...searchFilter) ([]oid.ID, error) {
sf := object.NewSearchFilters()
sf.AddRootFilter()
for _, filter := range filters {
sf.AddFilter(filter.Header, filter.Value, filter.MatchType)
}
return f.searchObjects(ctx, cnrID, sf)
}
func (f *Fs) deleteByPrefix(ctx context.Context, cnrID cid.ID, prefix string) error {
filters := object.NewSearchFilters()
filters.AddRootFilter()
filters.AddFilter(object.AttributeFilePath, prefix, object.MatchCommonPrefix)
var prmSearch pool.PrmObjectSearch
prmSearch.SetContainerID(cnrID)
prmSearch.SetFilters(filters)
res, err := f.pool.SearchObjects(ctx, prmSearch)
if err != nil {
return fmt.Errorf("init searching using client: %w", err)
}
defer res.Close()
var (
inErr error
found bool
prmDelete pool.PrmObjectDelete
addr oid.Address
)
addr.SetContainer(cnrID)
err = res.Iterate(func(id oid.ID) bool {
found = true
addr.SetObject(id)
prmDelete.SetAddress(addr)
if err = f.pool.DeleteObject(ctx, prmDelete); err != nil {
inErr = fmt.Errorf("delete object: %w", err)
return true
}
return false
})
if err == nil {
err = inErr
}
if err != nil {
return fmt.Errorf("iterate objects: %w", err)
}
if !found {
return fs.ErrorDirNotFound
}
return nil
}
func (f *Fs) isContainerEmpty(ctx context.Context, cnrID cid.ID) (bool, error) {
filters := object.NewSearchFilters()
filters.AddRootFilter()
var prm pool.PrmObjectSearch
prm.SetContainerID(cnrID)
prm.SetFilters(filters)
res, err := f.pool.SearchObjects(ctx, prm)
if err != nil {
return false, fmt.Errorf("init searching using client: %w", err)
}
defer res.Close()
isEmpty := true
err = res.Iterate(func(id oid.ID) bool {
isEmpty = false
return true
})
if err != nil {
return false, fmt.Errorf("iterate objects: %w", err)
}
return isEmpty, nil
}
func (f *Fs) searchObjects(ctx context.Context, cnrID cid.ID, filters object.SearchFilters) ([]oid.ID, error) {
var prm pool.PrmObjectSearch
prm.SetContainerID(cnrID)
prm.SetFilters(filters)
res, err := f.pool.SearchObjects(ctx, prm)
if err != nil {
return nil, fmt.Errorf("init searching using client: %w", err)
}
defer res.Close()
var buf []oid.ID
err = res.Iterate(func(id oid.ID) bool {
buf = append(buf, id)
return false
})
if err != nil {
return nil, fmt.Errorf("iterate objects: %w", err)
}
return buf, nil
}
// Check the interfaces are satisfied
var (
_ fs.Fs = &Fs{}
_ fs.ListRer = &Fs{}
_ fs.Purger = &Fs{}
_ fs.PutStreamer = &Fs{}
_ fs.Shutdowner = &Fs{}
_ fs.Object = &Object{}
_ fs.MimeTyper = &Object{}
)