2024-12-21 16:09:24 +03:00
// Package frostfs provides an interface to FrostFS decentralized distributed object storage system
package frostfs
import (
"bytes"
"context"
"encoding/hex"
"fmt"
"io"
"math"
"path"
"path/filepath"
"strconv"
"strings"
"sync"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/ape"
sdkClient "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
resolver "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/ns"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
"git.frostfs.info/TrueCloudLab/policy-engine/pkg/chain"
"github.com/rclone/rclone/fs"
"github.com/rclone/rclone/fs/config/configmap"
"github.com/rclone/rclone/fs/config/configstruct"
"github.com/rclone/rclone/fs/hash"
robject "github.com/rclone/rclone/fs/object"
"github.com/rclone/rclone/fs/walk"
"github.com/rclone/rclone/lib/bucket"
)
func init ( ) {
fs . Register ( & fs . RegInfo {
Name : "frostfs" ,
Description : "Distributed, decentralized object storage FrostFS" ,
NewFs : NewFs ,
Options : [ ] fs . Option {
{
Name : "endpoint" ,
Help : "Endpoints to connect to FrostFS node" ,
Required : true ,
Examples : [ ] fs . OptionExample {
{
Value : "s01.frostfs.devenv:8080" ,
Help : "One endpoint." ,
} ,
{
Value : "s01.frostfs.devenv:8080 s02.frostfs.devenv:8080" ,
Help : "Multiple endpoints to form pool." ,
} ,
{
Value : "s01.frostfs.devenv:8080,1 s02.frostfs.devenv:8080,2" ,
Help : "Multiple endpoints with priority (lower value has higher priority). Until s01 is healthy, all requests will be sent to it." ,
} ,
{
Value : "s01.frostfs.devenv:8080,1,1 s02.frostfs.devenv:8080,2,1 s03.frostfs.devenv:8080,2,9" ,
Help : "Multiple endpoints with priority and weights. After the s01 endpoint is unhealthy, requests will be sent to the s02 and s03 endpoints in proportions of 10% and 90%, respectively." ,
} ,
} ,
} ,
{
Name : "connection_timeout" ,
Default : 4 * time . Second ,
Help : "FrostFS connection timeout" ,
} ,
{
Name : "request_timeout" ,
Default : 12 * time . Second ,
Help : "FrostFS request timeout" ,
} ,
{
Name : "rebalance_interval" ,
Default : 15 * time . Second ,
Help : "FrostFS rebalance connections interval" ,
} ,
{
Name : "session_expiration" ,
Default : math . MaxUint32 ,
Help : "FrostFS session expiration epoch" ,
} ,
{
Name : "ape_cache_invalidation_duration" ,
Default : 8 * time . Second ,
Help : "APE cache invalidation duration" ,
} ,
{
Name : "ape_cache_invalidation_timeout" ,
Default : 24 * time . Second ,
Help : "APE cache invalidation timeout" ,
} ,
{
Name : "ape_chain_check_interval" ,
Default : 500 * time . Millisecond ,
Help : "The interval for verifying that the APE chain is saved in FrostFS." ,
} ,
{
Name : "rpc_endpoint" ,
Help : "Endpoint to connect to Neo rpc node" ,
} ,
{
Name : "wallet" ,
Help : "Path to wallet" ,
Required : true ,
} ,
{
Name : "address" ,
Help : "Address of account" ,
} ,
{
Name : "password" ,
Help : "Password to decrypt wallet" ,
} ,
{
Name : "placement_policy" ,
Default : "REP 3" ,
Help : "Placement policy for new containers" ,
Examples : [ ] fs . OptionExample {
{
Value : "REP 3" ,
Help : "Container will have 3 replicas" ,
} ,
} ,
} ,
2025-01-22 19:07:48 +03:00
{
Name : "default_container_zone" ,
Default : "container" ,
Help : "The name of the zone in which containers will be created or resolved if the zone name is not explicitly specified with the container name." ,
} ,
2024-12-21 16:09:24 +03:00
{
Name : "container_creation_policy" ,
Default : "private" ,
Help : "Container creation policy for new containers" ,
Examples : [ ] fs . OptionExample {
{
Value : "public-read-write" ,
Help : "Public container, anyone can read and write" ,
} ,
{
Value : "public-read" ,
Help : "Public container, owner can read and write, others can only read" ,
} ,
{
Value : "private" ,
Help : "Private container, only owner has access to it" ,
} ,
} ,
} ,
} ,
} )
}
// Options defines the configuration for this backend
type Options struct {
FrostfsEndpoint string ` config:"endpoint" `
FrostfsConnectionTimeout fs . Duration ` config:"connection_timeout" `
FrostfsRequestTimeout fs . Duration ` config:"request_timeout" `
FrostfsRebalanceInterval fs . Duration ` config:"rebalance_interval" `
FrostfsSessionExpiration uint64 ` config:"session_expiration" `
APECacheInvalidationDuration fs . Duration ` config:"ape_cache_invalidation_duration" `
APECacheInvalidationTimeout fs . Duration ` config:"ape_cache_invalidation_timeout" `
APEChainCheckInterval fs . Duration ` config:"ape_chain_check_interval" `
RPCEndpoint string ` config:"rpc_endpoint" `
Wallet string ` config:"wallet" `
Address string ` config:"address" `
Password string ` config:"password" `
PlacementPolicy string ` config:"placement_policy" `
2025-01-22 19:07:48 +03:00
DefaultContainerZone string ` config:"default_container_zone" `
2024-12-21 16:09:24 +03:00
ContainerCreationPolicy string ` config:"container_creation_policy" `
APERules [ ] chain . Rule ` config:"-" `
}
// Fs represents a remote frostfs server
type Fs struct {
name string // the name of the remote
root string // root of the bucket - ignore all objects above this
opt * Options
ctx context . Context
pool * pool . Pool
owner * user . ID
rootContainer string
rootDirectory string
features * fs . Features
resolver * resolver . NNS
m sync . Mutex
containerIDCache map [ string ] string
}
type searchFilter struct {
Header string
Value string
MatchType object . SearchMatchType
}
// Object describes a frostfs object
type Object struct {
* object . Object
fs * Fs
name string
remote string
filePath string
contentType string
timestamp time . Time
}
// Shutdown the backend, closing any background tasks and any
// cached connections.
func ( f * Fs ) Shutdown ( _ context . Context ) error {
f . pool . Close ( )
return nil
}
// PutStream uploads to the remote path with the modTime given of indeterminate size
func ( f * Fs ) PutStream ( ctx context . Context , in io . Reader , src fs . ObjectInfo , options ... fs . OpenOption ) ( fs . Object , error ) {
return f . Put ( ctx , in , src , options ... )
}
// NewFs creates a new Fs object from the name and root. It connects to
// the host specified in the config file.
func NewFs ( ctx context . Context , name string , root string , m configmap . Mapper ) ( fs . Fs , error ) {
opt := new ( Options )
err := configstruct . Set ( m , opt )
if err != nil {
return nil , err
}
opt . APERules , err = parseContainerCreationPolicyString ( opt . ContainerCreationPolicy )
if err != nil {
return nil , fmt . Errorf ( "couldn't parse container creation policy: %w" , err )
}
acc , err := getAccount ( opt )
if err != nil {
return nil , err
}
p , err := createPool ( ctx , acc . PrivateKey ( ) , opt )
if err != nil {
return nil , err
}
nnsResolver , err := createNNSResolver ( opt )
if err != nil {
return nil , err
}
var owner user . ID
user . IDFromKey ( & owner , acc . PrivateKey ( ) . PrivateKey . PublicKey )
f := & Fs {
name : name ,
opt : opt ,
ctx : ctx ,
pool : p ,
owner : & owner ,
resolver : nnsResolver ,
containerIDCache : make ( map [ string ] string ) ,
}
f . setRoot ( root )
f . features = ( & fs . Features {
DuplicateFiles : true ,
ReadMimeType : true ,
WriteMimeType : true ,
BucketBased : true ,
BucketBasedRootOK : true ,
} ) . Fill ( ctx , f )
if f . rootContainer != "" && f . rootDirectory != "" && ! strings . HasSuffix ( root , "/" ) {
// Check to see if the (container,directory) is actually an existing file
oldRoot := f . root
newRoot , leaf := path . Split ( oldRoot )
f . setRoot ( newRoot )
_ , err := f . NewObject ( ctx , leaf )
if err != nil {
// File doesn't exist or is a directory so return old f
f . setRoot ( oldRoot )
return f , nil
}
// return an error with a fs which points to the parent
return f , fs . ErrorIsFile
}
return f , nil
}
func newObject ( f * Fs , obj object . Object , container string ) * Object {
// we should not include rootDirectory into remote name
prefix := f . rootDirectory
if prefix != "" {
prefix += "/"
}
objID , _ := obj . ID ( )
objInfo := & Object {
Object : & obj ,
fs : f ,
name : objID . EncodeToString ( ) ,
}
for _ , attr := range obj . Attributes ( ) {
if attr . Key ( ) == object . AttributeFileName {
objInfo . name = attr . Value ( )
}
if attr . Key ( ) == object . AttributeFilePath {
objInfo . filePath = attr . Value ( )
}
if attr . Key ( ) == object . AttributeContentType {
objInfo . contentType = attr . Value ( )
}
if attr . Key ( ) == object . AttributeTimestamp {
value , err := strconv . ParseInt ( attr . Value ( ) , 10 , 64 )
if err != nil {
continue
}
objInfo . timestamp = time . Unix ( value , 0 )
}
}
if objInfo . filePath == "" {
objInfo . filePath = objInfo . name
}
objInfo . remote = objInfo . filePath
if strings . Contains ( objInfo . remote , prefix ) {
objInfo . remote = objInfo . remote [ len ( prefix ) : ]
if container != "" && f . rootContainer == "" {
// we should add container name to remote name if root is empty
objInfo . remote = path . Join ( container , objInfo . remote )
}
}
return objInfo
}
// MimeType of an Object if known, "" otherwise
func ( o * Object ) MimeType ( _ context . Context ) string {
return o . contentType
}
// String implements the Stringer interface
func ( o * Object ) String ( ) string {
if o == nil {
return "<nil>"
}
return o . filePath
}
// ID returns the ID of the Object if known, or "" if not
func ( o * Object ) ID ( ) string {
return o . objectID ( ) . EncodeToString ( )
}
func ( o * Object ) objectID ( ) oid . ID {
objID , _ := o . Object . ID ( )
return objID
}
// Remote returns the remote path
func ( o * Object ) Remote ( ) string {
return o . remote
}
// ModTime returns the modification time of the object
func ( o * Object ) ModTime ( _ context . Context ) time . Time {
return o . timestamp
}
// Size returns the size of an object in bytes
func ( o * Object ) Size ( ) int64 {
return int64 ( o . PayloadSize ( ) )
}
// Fs returns the parent Fs
func ( o * Object ) Fs ( ) fs . Info {
return o . fs
}
// Hash returns the SHA-256 hash of an object returning a lowercase hex string
func ( o * Object ) Hash ( _ context . Context , ty hash . Type ) ( string , error ) {
if ty != hash . SHA256 {
return "" , hash . ErrUnsupported
}
payloadCheckSum , _ := o . PayloadChecksum ( )
return hex . EncodeToString ( payloadCheckSum . Value ( ) ) , nil
}
// Storable says whether this object can be stored
func ( o * Object ) Storable ( ) bool {
return true
}
// SetModTime sets the modification time of the local fs object
func ( o * Object ) SetModTime ( context . Context , time . Time ) error {
return fs . ErrorCantSetModTime
}
// BuffCloser is wrapper to load files from frostfs.
type BuffCloser struct {
io . Reader
}
// Close implements the Closer interface
func ( bc * BuffCloser ) Close ( ) error {
return nil
}
// Open an object for read
func ( o * Object ) Open ( ctx context . Context , options ... fs . OpenOption ) ( io . ReadCloser , error ) {
var isRange bool
offset , length := uint64 ( 0 ) , o . PayloadSize ( )
fs . FixRangeOption ( options , int64 ( o . PayloadSize ( ) ) )
for _ , option := range options {
switch option := option . ( type ) {
case * fs . RangeOption :
isRange = true
offset = uint64 ( option . Start )
if option . End < 0 {
option . End = int64 ( o . PayloadSize ( ) ) + option . End
}
length = uint64 ( option . End - option . Start + 1 )
case * fs . SeekOption :
isRange = true
offset = uint64 ( option . Offset )
length = o . PayloadSize ( ) - uint64 ( option . Offset )
default :
if option . Mandatory ( ) {
fs . Logf ( o , "Unsupported mandatory option: %v" , option )
}
}
}
cnrID , _ := o . ContainerID ( )
addr := newAddress ( cnrID , o . objectID ( ) )
if isRange {
var prm pool . PrmObjectRange
prm . SetAddress ( addr )
prm . SetOffset ( offset )
prm . SetLength ( length )
rangeReader , err := o . fs . pool . ObjectRange ( ctx , prm )
if err != nil {
return nil , err
}
return & rangeReader , nil
}
var prm pool . PrmObjectGet
prm . SetAddress ( addr )
// we cannot use ObjectRange in this case because it panics if zero length range is requested
res , err := o . fs . pool . GetObject ( ctx , prm )
if err != nil {
return nil , fmt . Errorf ( "couldn't get object %s: %w" , addr , err )
}
return res . Payload , nil
}
// Name of the remote (as passed into NewFs)
func ( f * Fs ) Name ( ) string {
return f . name
}
// Root of the remote (as passed into NewFs)
func ( f * Fs ) Root ( ) string {
return f . root
}
// String converts this Fs to a string
func ( f * Fs ) String ( ) string {
if f . rootContainer == "" {
return "FrostFS root"
}
if f . rootDirectory == "" {
return fmt . Sprintf ( "FrostFS container %s" , f . rootContainer )
}
return fmt . Sprintf ( "FrostFS container %s path %s" , f . rootContainer , f . rootDirectory )
}
// Precision of the remote
func ( f * Fs ) Precision ( ) time . Duration {
return time . Second
}
// Hashes returns the supported hash sets.
func ( f * Fs ) Hashes ( ) hash . Set {
return hash . Set ( hash . SHA256 )
}
// Features returns the optional features of this Fs
func ( f * Fs ) Features ( ) * fs . Features {
return f . features
}
// List the objects and directories in dir into entries.
func ( f * Fs ) List ( ctx context . Context , dir string ) ( fs . DirEntries , error ) {
2025-01-22 19:07:48 +03:00
rootDirName , containerPath := bucket . Split ( path . Join ( f . root , dir ) )
2024-12-21 16:09:24 +03:00
2025-01-22 19:07:48 +03:00
if rootDirName == "" {
2024-12-21 16:09:24 +03:00
if containerPath != "" {
return nil , fs . ErrorListBucketRequired
}
return f . listContainers ( ctx )
}
2025-01-22 19:07:48 +03:00
return f . listEntries ( ctx , rootDirName , containerPath , dir , false )
2024-12-21 16:09:24 +03:00
}
// ListR lists the objects and directories of the Fs starting
// from dir recursively into out.
func ( f * Fs ) ListR ( ctx context . Context , dir string , callback fs . ListRCallback ) error {
2025-01-22 19:07:48 +03:00
rootDirName , containerPath := bucket . Split ( path . Join ( f . root , dir ) )
2024-12-21 16:09:24 +03:00
list := walk . NewListRHelper ( callback )
2025-01-22 19:07:48 +03:00
if rootDirName == "" {
2024-12-21 16:09:24 +03:00
if containerPath != "" {
return fs . ErrorListBucketRequired
}
containers , err := f . listContainers ( ctx )
if err != nil {
return err
}
for _ , containerDir := range containers {
if err = f . listR ( ctx , list , containerDir . Remote ( ) , containerPath , dir ) ; err != nil {
return err
}
}
return list . Flush ( )
}
2025-01-22 19:07:48 +03:00
if err := f . listR ( ctx , list , rootDirName , containerPath , dir ) ; err != nil {
2024-12-21 16:09:24 +03:00
return err
}
return list . Flush ( )
}
2025-01-22 19:07:48 +03:00
func ( f * Fs ) listR ( ctx context . Context , list * walk . ListRHelper , rootDirName , containerPath , dir string ) error {
entries , err := f . listEntries ( ctx , rootDirName , containerPath , dir , true )
2024-12-21 16:09:24 +03:00
if err != nil {
return err
}
for _ , entry := range entries {
if err = list . Add ( entry ) ; err != nil {
return err
}
}
return nil
}
2025-01-22 19:07:48 +03:00
func ( f * Fs ) resolveOrCreateContainer ( ctx context . Context , rootDirName string ) ( cid . ID , error ) {
// Due to the fact that this method is called when performing "put" operations,
// which can be run in parallel in several goroutines,
// we need to use a global lock here so that if a requested container is missing,
// multiple goroutines do not attempt to create a container with the same name simultaneously,
// which could cause unexpected behavior.
2024-12-21 16:09:24 +03:00
f . m . Lock ( )
defer f . m . Unlock ( )
2025-01-22 19:07:48 +03:00
cnrID , err := f . resolveContainerIDHelper ( ctx , rootDirName )
2024-12-21 16:09:24 +03:00
if err == nil {
return cnrID , err
}
2025-01-22 19:07:48 +03:00
if cnrID , err = f . createContainer ( ctx , rootDirName ) ; err != nil {
delete ( f . containerIDCache , rootDirName )
2024-12-21 16:09:24 +03:00
return cid . ID { } , fmt . Errorf ( "createContainer: %w" , err )
}
2025-01-22 19:07:48 +03:00
f . containerIDCache [ rootDirName ] = cnrID . String ( )
2024-12-21 16:09:24 +03:00
return cnrID , nil
}
// Put the Object into the container
func ( f * Fs ) Put ( ctx context . Context , in io . Reader , src fs . ObjectInfo , options ... fs . OpenOption ) ( fs . Object , error ) {
2025-01-22 19:07:48 +03:00
rootDirName , containerPath := bucket . Split ( filepath . Join ( f . root , src . Remote ( ) ) )
2024-12-21 16:09:24 +03:00
2025-01-22 19:07:48 +03:00
cnrID , err := parseContainerID ( rootDirName )
2024-12-21 16:09:24 +03:00
if err != nil {
2025-01-22 19:07:48 +03:00
if cnrID , err = f . resolveOrCreateContainer ( ctx , rootDirName ) ; err != nil {
2024-12-21 16:09:24 +03:00
return nil , err
}
}
ids , err := f . findObjectsFilePath ( ctx , cnrID , containerPath )
if err != nil {
return nil , err
}
headers := fillHeaders ( ctx , containerPath , src , options ... )
objHeader := formObject ( f . owner , cnrID , filepath . Base ( containerPath ) , headers )
var prmPut pool . PrmObjectPut
prmPut . SetHeader ( * objHeader )
prmPut . SetPayload ( in )
objID , err := f . pool . PutObject ( ctx , prmPut )
if err != nil {
return nil , err
}
var prmHead pool . PrmObjectHead
prmHead . SetAddress ( newAddress ( cnrID , objID . ObjectID ) )
obj , err := f . pool . HeadObject ( ctx , prmHead )
if err != nil {
return nil , err
}
for _ , id := range ids {
var prmDelete pool . PrmObjectDelete
prmDelete . SetAddress ( newAddress ( cnrID , id ) )
_ = f . pool . DeleteObject ( ctx , prmDelete )
}
return newObject ( f , obj , "" ) , nil
}
func fillHeaders ( ctx context . Context , filePath string , src fs . ObjectInfo , options ... fs . OpenOption ) map [ string ] string {
if mimeTyper , ok := src . ( fs . MimeTyper ) ; ok {
options = append ( options , & fs . HTTPOption {
Key : object . AttributeContentType ,
Value : mimeTyper . MimeType ( ctx ) ,
} )
}
headers := map [ string ] string { object . AttributeFilePath : filePath }
for _ , option := range options {
key , value := option . Header ( )
lowerKey := strings . ToLower ( key )
if len ( value ) == 0 {
continue
}
switch lowerKey {
case "" :
// ignore empty headers
case "content-type" :
headers [ object . AttributeContentType ] = value
case "timestamp" :
headers [ object . AttributeTimestamp ] = value
default :
if value != "" {
headers [ key ] = value
}
}
}
if headers [ object . AttributeTimestamp ] == "" {
headers [ object . AttributeTimestamp ] = strconv . FormatInt ( src . ModTime ( ctx ) . UTC ( ) . Unix ( ) , 10 )
}
return headers
}
// Update the Object from in with modTime and size
func ( o * Object ) Update ( ctx context . Context , in io . Reader , src fs . ObjectInfo , options ... fs . OpenOption ) error {
// When updating an object, the path to it should not change.
src = robject . NewStaticObjectInfo ( o . Remote ( ) , src . ModTime ( ctx ) , src . Size ( ) , src . Storable ( ) , nil , src . Fs ( ) )
2025-01-22 19:07:48 +03:00
rootDirName , containerPath := bucket . Split ( filepath . Join ( o . fs . root , src . Remote ( ) ) )
2024-12-21 16:09:24 +03:00
var cnrID cid . ID
var err error
2025-01-22 19:07:48 +03:00
if cnrID , err = o . fs . parseContainer ( ctx , rootDirName ) ; err != nil {
2024-12-21 16:09:24 +03:00
return fmt . Errorf ( "parse container: %w" , err )
}
headers := fillHeaders ( ctx , containerPath , src , options ... )
objHeader := formObject ( o . fs . owner , cnrID , filepath . Base ( containerPath ) , headers )
var prmPatch pool . PrmObjectPatch
var rng object . Range
rng . SetOffset ( 0 )
rng . SetLength ( uint64 ( o . Size ( ) ) )
prmPatch . SetAddress ( newAddress ( cnrID , o . objectID ( ) ) )
prmPatch . SetNewAttributes ( objHeader . Attributes ( ) )
prmPatch . SetRange ( & rng )
prmPatch . SetPayloadReader ( in )
prmPatch . SetReplaceAttributes ( true )
patchRes , err := o . fs . pool . PatchObject ( ctx , prmPatch )
if err != nil {
return fmt . Errorf ( "patch object: %w" , err )
}
// If the object IDs match, it means we have updated the object itself,
// and the update process can be interrupted at this point.
if patchRes . ObjectID . Equals ( o . objectID ( ) ) {
return nil
}
// Since the PatchObject method does not delete the source object, we need to delete it explicitly.
if err = o . Remove ( ctx ) ; err != nil {
fs . Logf ( o , "couldn't remove old file after update '%s': %s" , o . objectID ( ) , err . Error ( ) )
}
var prmHead pool . PrmObjectHead
prmHead . SetAddress ( newAddress ( cnrID , patchRes . ObjectID ) )
obj , err := o . fs . pool . HeadObject ( ctx , prmHead )
if err != nil {
return fmt . Errorf ( "fetch head object: %w" , err )
}
objInfo := newObject ( o . fs , obj , "" )
o . filePath = objInfo . filePath
o . remote = objInfo . remote
o . name = objInfo . name
o . timestamp = objInfo . timestamp
o . Object = & obj
return nil
}
2025-01-22 19:07:48 +03:00
func ( f * Fs ) getContainerNameAndZone ( containerStr string ) ( string , string ) {
return getContainerNameAndZone ( containerStr , f . opt . DefaultContainerZone )
}
2024-12-21 16:09:24 +03:00
// Remove an object
func ( o * Object ) Remove ( ctx context . Context ) error {
cnrID , _ := o . ContainerID ( )
var prm pool . PrmObjectDelete
prm . SetAddress ( newAddress ( cnrID , o . objectID ( ) ) )
return o . fs . pool . DeleteObject ( ctx , prm )
}
// NewObject finds the Object at remote.
func ( f * Fs ) NewObject ( ctx context . Context , remote string ) ( fs . Object , error ) {
containerStr , containerPath := bucket . Split ( filepath . Join ( f . root , remote ) )
cnrID , err := f . parseContainer ( ctx , containerStr )
if err != nil {
return nil , fs . ErrorDirNotFound
}
ids , err := f . findObjectsFilePath ( ctx , cnrID , containerPath )
if err != nil {
return nil , err
}
if len ( ids ) == 0 {
return nil , fs . ErrorObjectNotFound
}
var prm pool . PrmObjectHead
prm . SetAddress ( newAddress ( cnrID , ids [ 0 ] ) )
obj , err := f . pool . HeadObject ( ctx , prm )
if err != nil {
return nil , fmt . Errorf ( "head object: %w" , err )
}
return newObject ( f , obj , "" ) , nil
}
func ( f * Fs ) waitForAPECacheInvalidated ( ctx context . Context , expectedCh chain . Chain , cnrID cid . ID ) error {
prmListAPEChains := pool . PrmListAPEChains {
Target : ape . ChainTarget {
TargetType : ape . TargetTypeContainer ,
Name : cnrID . EncodeToString ( ) ,
} ,
}
checkCtxDone := func ( ctx context . Context ) error {
if _ , ok := ctx . Deadline ( ) ; ok {
return fmt . Errorf ( "waiting time for storing APE chain has expired" )
}
return fmt . Errorf ( "waiting for the APE chain to be stored has been cancelled" )
}
ctx , cancel := context . WithTimeout ( ctx , time . Duration ( f . opt . APECacheInvalidationTimeout ) )
defer cancel ( )
for {
chains , err := f . pool . ListAPEChains ( ctx , prmListAPEChains )
if err != nil {
return fmt . Errorf ( "list APE chains: %w" , err )
}
for _ , rawChain := range chains {
var ch chain . Chain
err = ch . UnmarshalBinary ( rawChain . Raw )
if err != nil {
return fmt . Errorf ( "unmarshal chain: %w" , err )
}
if bytes . Equal ( ch . ID , expectedCh . ID ) {
// At the moment, according to the core team, there is no way through the API
// to check whether the APE chain stored in the contact has been applied to the container.
// So after we make sure that the APE chain is stored, we just wait for a certain period of time
// (8 seconds by default, the time until the next block and cache invalidation)
select {
case <- ctx . Done ( ) :
return checkCtxDone ( ctx )
case <- time . After ( time . Duration ( f . opt . APECacheInvalidationDuration ) ) :
return nil
}
}
}
select {
case <- ctx . Done ( ) :
return checkCtxDone ( ctx )
case <- time . After ( time . Duration ( f . opt . APEChainCheckInterval ) ) :
}
}
}
2025-01-22 19:07:48 +03:00
func ( f * Fs ) createContainer ( ctx context . Context , rootDirName string ) ( cid . ID , error ) {
2024-12-21 16:09:24 +03:00
var policy netmap . PlacementPolicy
if err := policy . DecodeString ( f . opt . PlacementPolicy ) ; err != nil {
return cid . ID { } , fmt . Errorf ( "parse placement policy: %w" , err )
}
var cnr container . Container
cnr . Init ( )
cnr . SetPlacementPolicy ( policy )
cnr . SetOwner ( * f . owner )
container . SetCreationTime ( & cnr , time . Now ( ) )
2025-01-22 19:07:48 +03:00
container . SetName ( & cnr , rootDirName )
2024-12-21 16:09:24 +03:00
2025-01-22 19:07:48 +03:00
cnrName , cnrZone := f . getContainerNameAndZone ( rootDirName )
2024-12-21 16:09:24 +03:00
var domain container . Domain
2025-01-22 19:07:48 +03:00
domain . SetZone ( cnrZone )
domain . SetName ( cnrName )
2024-12-21 16:09:24 +03:00
container . WriteDomain ( & cnr , domain )
if err := pool . SyncContainerWithNetwork ( ctx , & cnr , f . pool ) ; err != nil {
return cid . ID { } , fmt . Errorf ( "sync container with network: %w" , err )
}
prm := pool . PrmContainerPut {
ClientParams : sdkClient . PrmContainerPut {
Container : & cnr ,
} ,
}
cnrID , err := f . pool . PutContainer ( ctx , prm )
if err != nil {
return cnrID , fmt . Errorf ( "put container: %w" , err )
}
ch := chain . Chain {
ID : chain . ID ( "rclone/" + f . owner . String ( ) ) ,
Rules : f . opt . APERules ,
}
data , err := ch . MarshalBinary ( )
if err != nil {
return cid . ID { } , fmt . Errorf ( "marshal chain: %w" , err )
}
prmAddAPEChain := pool . PrmAddAPEChain {
Target : ape . ChainTarget {
TargetType : ape . TargetTypeContainer ,
Name : cnrID . EncodeToString ( ) ,
} ,
Chain : ape . Chain { Raw : data } ,
}
if err = f . pool . AddAPEChain ( ctx , prmAddAPEChain ) ; err != nil {
return cid . ID { } , fmt . Errorf ( "add APE chain: %w" , err )
}
if err = f . waitForAPECacheInvalidated ( ctx , ch , cnrID ) ; err != nil {
return cid . ID { } , fmt . Errorf ( "wait for APE cache to be invalidated: %w" , err )
}
return cnrID , nil
}
// Mkdir creates the container if it doesn't exist
func ( f * Fs ) Mkdir ( ctx context . Context , dir string ) error {
2025-01-22 19:07:48 +03:00
rootDirName , _ := bucket . Split ( path . Join ( f . root , dir ) )
if rootDirName == "" {
2024-12-21 16:09:24 +03:00
return nil
}
2025-01-22 19:07:48 +03:00
_ , err := parseContainerID ( rootDirName )
2024-12-21 16:09:24 +03:00
if err != nil {
2025-01-22 19:07:48 +03:00
if _ , err = f . resolveOrCreateContainer ( ctx , rootDirName ) ; err != nil {
2024-12-21 16:09:24 +03:00
return err
}
}
return nil
}
// Rmdir deletes the bucket if the fs is at the root
func ( f * Fs ) Rmdir ( ctx context . Context , dir string ) error {
2025-01-22 19:07:48 +03:00
rootDirName , containerPath := bucket . Split ( path . Join ( f . root , dir ) )
if rootDirName == "" || containerPath != "" {
2024-12-21 16:09:24 +03:00
return nil
}
2025-01-22 19:07:48 +03:00
cnrID , err := f . parseContainer ( ctx , rootDirName )
2024-12-21 16:09:24 +03:00
if err != nil {
return fs . ErrorDirNotFound
}
isEmpty , err := f . isContainerEmpty ( ctx , cnrID )
if err != nil {
return err
}
if ! isEmpty {
return fs . ErrorDirectoryNotEmpty
}
prm := pool . PrmContainerDelete {
ContainerID : cnrID ,
}
f . m . Lock ( )
defer f . m . Unlock ( )
if err = f . pool . DeleteContainer ( ctx , prm ) ; err != nil {
2025-01-22 19:07:48 +03:00
return fmt . Errorf ( "couldn't delete container %s '%s': %w" , cnrID , rootDirName , err )
2024-12-21 16:09:24 +03:00
}
2025-01-22 19:07:48 +03:00
delete ( f . containerIDCache , rootDirName )
2024-12-21 16:09:24 +03:00
return nil
}
// Purge deletes all the files and directories including the old versions.
func ( f * Fs ) Purge ( ctx context . Context , dir string ) error {
2025-01-22 19:07:48 +03:00
rootDirName , containerPath := bucket . Split ( path . Join ( f . root , dir ) )
2024-12-21 16:09:24 +03:00
2025-01-22 19:07:48 +03:00
cnrID , err := f . parseContainer ( ctx , rootDirName )
2024-12-21 16:09:24 +03:00
if err != nil {
return nil
}
if err = f . deleteByPrefix ( ctx , cnrID , containerPath ) ; err != nil {
return fmt . Errorf ( "delete by prefix: %w" , err )
}
return nil
}
// setRoot changes the root of the Fs
func ( f * Fs ) setRoot ( root string ) {
f . root = strings . Trim ( root , "/" )
f . rootContainer , f . rootDirectory = bucket . Split ( f . root )
}
func parseContainerID ( containerStr string ) ( cid . ID , error ) {
var cnrID cid . ID
err := cnrID . DecodeString ( containerStr )
return cnrID , err
}
2025-01-22 19:07:48 +03:00
func getContainerIDByNameAndZone ( dirEntry fs . DirEntry , cnrName , cnrZone , defaultZone string ) ( cnrID cid . ID , ok bool , err error ) {
actualName , actualZone := getContainerNameAndZone ( dirEntry . Remote ( ) , defaultZone )
if cnrName != actualName || cnrZone != actualZone {
2024-12-21 16:09:24 +03:00
return
}
var idEr fs . IDer
if idEr , ok = dirEntry . ( fs . IDer ) ; ok {
err = cnrID . DecodeString ( idEr . ID ( ) )
}
return
}
2025-01-22 19:07:48 +03:00
func resolveContainerIDWithNNS ( resolver * resolver . NNS , cnrName , cnrZone string ) ( cid . ID , error ) {
2024-12-21 16:09:24 +03:00
var d container . Domain
2025-01-22 19:07:48 +03:00
d . SetZone ( cnrZone )
d . SetName ( cnrName )
2024-12-21 16:09:24 +03:00
if cnrID , err := resolver . ResolveContainerDomain ( d ) ; err == nil {
return cnrID , err
}
2025-01-22 19:07:48 +03:00
return cid . ID { } , fmt . Errorf ( "couldn't resolve container with name '%s' and zone '%s'" , cnrName , cnrZone )
2024-12-21 16:09:24 +03:00
}
2025-01-22 19:07:48 +03:00
func ( f * Fs ) resolveCIDByRootDirName ( ctx context . Context , rootDirName string ) ( cid . ID , error ) {
cnrName , cnrZone := f . getContainerNameAndZone ( rootDirName )
if cnrName == "" {
return cid . ID { } , fmt . Errorf ( "couldn't resolve container '%s'" , rootDirName )
2024-12-21 16:09:24 +03:00
}
if f . resolver != nil {
2025-01-22 19:07:48 +03:00
return resolveContainerIDWithNNS ( f . resolver , cnrName , cnrZone )
2024-12-21 16:09:24 +03:00
}
if dirEntries , err := f . listContainers ( ctx ) ; err == nil {
for _ , dirEntry := range dirEntries {
2025-01-22 19:07:48 +03:00
if cnrID , ok , err := getContainerIDByNameAndZone ( dirEntry , cnrName , cnrZone , f . opt . DefaultContainerZone ) ; ok {
2024-12-21 16:09:24 +03:00
return cnrID , err
}
}
}
2025-01-22 19:07:48 +03:00
return cid . ID { } , fmt . Errorf ( "couldn't resolve container '%s'" , rootDirName )
2024-12-21 16:09:24 +03:00
}
2025-01-22 19:07:48 +03:00
func ( f * Fs ) resolveContainerIDHelper ( ctx context . Context , rootDirName string ) ( cid . ID , error ) {
cnrIDStr , ok := f . containerIDCache [ rootDirName ]
if ok {
return parseContainerID ( cnrIDStr )
}
cnrID , err := f . resolveCIDByRootDirName ( ctx , rootDirName )
if err != nil {
return cid . ID { } , err
}
f . containerIDCache [ rootDirName ] = cnrID . String ( )
return cnrID , nil
}
func ( f * Fs ) resolveContainerID ( ctx context . Context , rootDirName string ) ( cid . ID , error ) {
2024-12-21 16:09:24 +03:00
f . m . Lock ( )
defer f . m . Unlock ( )
2025-01-22 19:07:48 +03:00
return f . resolveContainerIDHelper ( ctx , rootDirName )
2024-12-21 16:09:24 +03:00
}
2025-01-22 19:07:48 +03:00
func ( f * Fs ) parseContainer ( ctx context . Context , rootDirName string ) ( cid . ID , error ) {
cnrID , err := parseContainerID ( rootDirName )
if err != nil {
return f . resolveContainerID ( ctx , rootDirName )
2024-12-21 16:09:24 +03:00
}
2025-01-22 19:07:48 +03:00
return cnrID , nil
2024-12-21 16:09:24 +03:00
}
2025-01-22 19:07:48 +03:00
func ( f * Fs ) listEntries ( ctx context . Context , rootDirName , containerPath , directory string , recursive bool ) ( fs . DirEntries , error ) {
cnrID , err := f . parseContainer ( ctx , rootDirName )
2024-12-21 16:09:24 +03:00
if err != nil {
return nil , fs . ErrorDirNotFound
}
ids , err := f . findObjectsPrefix ( ctx , cnrID , containerPath )
if err != nil {
return nil , err
}
res := make ( [ ] fs . DirEntry , 0 , len ( ids ) )
dirs := make ( map [ string ] * fs . Dir )
objs := make ( map [ string ] * Object )
for _ , id := range ids {
var prm pool . PrmObjectHead
prm . SetAddress ( newAddress ( cnrID , id ) )
obj , err := f . pool . HeadObject ( ctx , prm )
if err != nil {
return nil , err
}
2025-01-22 19:07:48 +03:00
objInf := newObject ( f , obj , rootDirName )
2024-12-21 16:09:24 +03:00
if ! recursive {
withoutPath := strings . TrimPrefix ( objInf . filePath , containerPath )
trimPrefixSlash := strings . TrimPrefix ( withoutPath , "/" )
// emulate directories
if index := strings . Index ( trimPrefixSlash , "/" ) ; index >= 0 {
dir := fs . NewDir ( filepath . Join ( directory , trimPrefixSlash [ : index ] ) , time . Time { } )
dir . SetID ( filepath . Join ( containerPath , dir . Remote ( ) ) )
dirs [ dir . ID ( ) ] = dir
continue
}
}
if o , ok := objs [ objInf . remote ] ; ! ok || o . timestamp . Before ( objInf . timestamp ) {
objs [ objInf . remote ] = objInf
}
}
for _ , dir := range dirs {
res = append ( res , dir )
}
for _ , obj := range objs {
res = append ( res , obj )
}
return res , nil
}
func ( f * Fs ) listContainers ( ctx context . Context ) ( fs . DirEntries , error ) {
prmList := pool . PrmContainerList {
OwnerID : * f . owner ,
}
containers , err := f . pool . ListContainers ( ctx , prmList )
if err != nil {
return nil , err
}
res := make ( [ ] fs . DirEntry , len ( containers ) )
for i , containerID := range containers {
prm := pool . PrmContainerGet {
ContainerID : containerID ,
}
cnr , err := f . pool . GetContainer ( ctx , prm )
if err != nil {
return nil , fmt . Errorf ( "couldn't get container '%s': %w" , containerID , err )
}
2025-01-22 19:07:48 +03:00
res [ i ] = newDir ( containerID , cnr , f . opt . DefaultContainerZone )
2024-12-21 16:09:24 +03:00
}
return res , nil
}
func ( f * Fs ) findObjectsFilePath ( ctx context . Context , cnrID cid . ID , filePath string ) ( [ ] oid . ID , error ) {
return f . findObjects ( ctx , cnrID , searchFilter {
Header : object . AttributeFilePath ,
Value : filePath ,
MatchType : object . MatchStringEqual ,
} )
}
func ( f * Fs ) findObjectsPrefix ( ctx context . Context , cnrID cid . ID , prefix string ) ( [ ] oid . ID , error ) {
return f . findObjects ( ctx , cnrID , searchFilter {
Header : object . AttributeFilePath ,
Value : prefix ,
MatchType : object . MatchCommonPrefix ,
} )
}
func ( f * Fs ) findObjects ( ctx context . Context , cnrID cid . ID , filters ... searchFilter ) ( [ ] oid . ID , error ) {
sf := object . NewSearchFilters ( )
sf . AddRootFilter ( )
for _ , filter := range filters {
sf . AddFilter ( filter . Header , filter . Value , filter . MatchType )
}
return f . searchObjects ( ctx , cnrID , sf )
}
func ( f * Fs ) deleteByPrefix ( ctx context . Context , cnrID cid . ID , prefix string ) error {
filters := object . NewSearchFilters ( )
filters . AddRootFilter ( )
filters . AddFilter ( object . AttributeFilePath , prefix , object . MatchCommonPrefix )
var prmSearch pool . PrmObjectSearch
prmSearch . SetContainerID ( cnrID )
prmSearch . SetFilters ( filters )
res , err := f . pool . SearchObjects ( ctx , prmSearch )
if err != nil {
return fmt . Errorf ( "init searching using client: %w" , err )
}
defer res . Close ( )
var (
inErr error
found bool
prmDelete pool . PrmObjectDelete
addr oid . Address
)
addr . SetContainer ( cnrID )
err = res . Iterate ( func ( id oid . ID ) bool {
found = true
addr . SetObject ( id )
prmDelete . SetAddress ( addr )
if err = f . pool . DeleteObject ( ctx , prmDelete ) ; err != nil {
inErr = fmt . Errorf ( "delete object: %w" , err )
return true
}
return false
} )
if err == nil {
err = inErr
}
if err != nil {
return fmt . Errorf ( "iterate objects: %w" , err )
}
if ! found {
return fs . ErrorDirNotFound
}
return nil
}
func ( f * Fs ) isContainerEmpty ( ctx context . Context , cnrID cid . ID ) ( bool , error ) {
filters := object . NewSearchFilters ( )
filters . AddRootFilter ( )
var prm pool . PrmObjectSearch
prm . SetContainerID ( cnrID )
prm . SetFilters ( filters )
res , err := f . pool . SearchObjects ( ctx , prm )
if err != nil {
return false , fmt . Errorf ( "init searching using client: %w" , err )
}
defer res . Close ( )
isEmpty := true
err = res . Iterate ( func ( id oid . ID ) bool {
isEmpty = false
return true
} )
if err != nil {
return false , fmt . Errorf ( "iterate objects: %w" , err )
}
return isEmpty , nil
}
func ( f * Fs ) searchObjects ( ctx context . Context , cnrID cid . ID , filters object . SearchFilters ) ( [ ] oid . ID , error ) {
var prm pool . PrmObjectSearch
prm . SetContainerID ( cnrID )
prm . SetFilters ( filters )
res , err := f . pool . SearchObjects ( ctx , prm )
if err != nil {
return nil , fmt . Errorf ( "init searching using client: %w" , err )
}
defer res . Close ( )
var buf [ ] oid . ID
err = res . Iterate ( func ( id oid . ID ) bool {
buf = append ( buf , id )
return false
} )
if err != nil {
return nil , fmt . Errorf ( "iterate objects: %w" , err )
}
return buf , nil
}
// Check the interfaces are satisfied
var (
_ fs . Fs = & Fs { }
_ fs . ListRer = & Fs { }
_ fs . Purger = & Fs { }
_ fs . PutStreamer = & Fs { }
_ fs . Shutdowner = & Fs { }
_ fs . Object = & Object { }
_ fs . MimeTyper = & Object { }
)