af260921c0
The improved upload logic is active by default in uplink v1.12.0, so the `testuplink.WithConcurrentSegmentUploadsDefaultConfig(ctx)` is not required anymore. See https://github.com/rclone/rclone/pull/7198
891 lines
25 KiB
Go
891 lines
25 KiB
Go
//go:build !plan9
|
|
// +build !plan9
|
|
|
|
// Package storj provides an interface to Storj decentralized object storage.
|
|
package storj
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"path"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/rclone/rclone/fs"
|
|
"github.com/rclone/rclone/fs/config"
|
|
"github.com/rclone/rclone/fs/config/configmap"
|
|
"github.com/rclone/rclone/fs/config/configstruct"
|
|
"github.com/rclone/rclone/fs/fserrors"
|
|
"github.com/rclone/rclone/fs/hash"
|
|
"github.com/rclone/rclone/lib/bucket"
|
|
"golang.org/x/text/unicode/norm"
|
|
|
|
"storj.io/uplink"
|
|
"storj.io/uplink/edge"
|
|
)
|
|
|
|
const (
|
|
existingProvider = "existing"
|
|
newProvider = "new"
|
|
)
|
|
|
|
var satMap = map[string]string{
|
|
"us1.storj.io": "12EayRS2V1kEsWESU9QMRseFhdxYxKicsiFmxrsLZHeLUtdps3S@us1.storj.io:7777",
|
|
"eu1.storj.io": "12L9ZFwhzVpuEKMUNUqkaTLGzwY9G24tbiigLiXpmZWKwmcNDDs@eu1.storj.io:7777",
|
|
"ap1.storj.io": "121RTSDpyNZVcEU84Ticf2L1ntiuUimbWgfATz21tuvgk3vzoA6@ap1.storj.io:7777",
|
|
}
|
|
|
|
// Register with Fs
|
|
func init() {
|
|
fs.Register(&fs.RegInfo{
|
|
Name: "storj",
|
|
Description: "Storj Decentralized Cloud Storage",
|
|
Aliases: []string{"tardigrade"},
|
|
NewFs: NewFs,
|
|
Config: func(ctx context.Context, name string, m configmap.Mapper, configIn fs.ConfigIn) (*fs.ConfigOut, error) {
|
|
provider, _ := m.Get(fs.ConfigProvider)
|
|
|
|
config.FileDeleteKey(name, fs.ConfigProvider)
|
|
|
|
if provider == newProvider {
|
|
satelliteString, _ := m.Get("satellite_address")
|
|
apiKey, _ := m.Get("api_key")
|
|
passphrase, _ := m.Get("passphrase")
|
|
|
|
// satelliteString contains always default and passphrase can be empty
|
|
if apiKey == "" {
|
|
return nil, nil
|
|
}
|
|
|
|
satellite, found := satMap[satelliteString]
|
|
if !found {
|
|
satellite = satelliteString
|
|
}
|
|
|
|
access, err := uplink.RequestAccessWithPassphrase(context.TODO(), satellite, apiKey, passphrase)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("couldn't create access grant: %w", err)
|
|
}
|
|
|
|
serializedAccess, err := access.Serialize()
|
|
if err != nil {
|
|
return nil, fmt.Errorf("couldn't serialize access grant: %w", err)
|
|
}
|
|
m.Set("satellite_address", satellite)
|
|
m.Set("access_grant", serializedAccess)
|
|
} else if provider == existingProvider {
|
|
config.FileDeleteKey(name, "satellite_address")
|
|
config.FileDeleteKey(name, "api_key")
|
|
config.FileDeleteKey(name, "passphrase")
|
|
} else {
|
|
return nil, fmt.Errorf("invalid provider type: %s", provider)
|
|
}
|
|
return nil, nil
|
|
},
|
|
Options: []fs.Option{
|
|
{
|
|
Name: fs.ConfigProvider,
|
|
Help: "Choose an authentication method.",
|
|
Default: existingProvider,
|
|
Examples: []fs.OptionExample{{
|
|
Value: "existing",
|
|
Help: "Use an existing access grant.",
|
|
}, {
|
|
Value: newProvider,
|
|
Help: "Create a new access grant from satellite address, API key, and passphrase.",
|
|
},
|
|
}},
|
|
{
|
|
Name: "access_grant",
|
|
Help: "Access grant.",
|
|
Provider: "existing",
|
|
Sensitive: true,
|
|
},
|
|
{
|
|
Name: "satellite_address",
|
|
Help: "Satellite address.\n\nCustom satellite address should match the format: `<nodeid>@<address>:<port>`.",
|
|
Provider: newProvider,
|
|
Default: "us1.storj.io",
|
|
Examples: []fs.OptionExample{{
|
|
Value: "us1.storj.io",
|
|
Help: "US1",
|
|
}, {
|
|
Value: "eu1.storj.io",
|
|
Help: "EU1",
|
|
}, {
|
|
Value: "ap1.storj.io",
|
|
Help: "AP1",
|
|
},
|
|
},
|
|
},
|
|
{
|
|
Name: "api_key",
|
|
Help: "API key.",
|
|
Provider: newProvider,
|
|
Sensitive: true,
|
|
},
|
|
{
|
|
Name: "passphrase",
|
|
Help: "Encryption passphrase.\n\nTo access existing objects enter passphrase used for uploading.",
|
|
Provider: newProvider,
|
|
Sensitive: true,
|
|
},
|
|
},
|
|
})
|
|
}
|
|
|
|
// Options defines the configuration for this backend
|
|
type Options struct {
|
|
Access string `config:"access_grant"`
|
|
|
|
SatelliteAddress string `config:"satellite_address"`
|
|
APIKey string `config:"api_key"`
|
|
Passphrase string `config:"passphrase"`
|
|
}
|
|
|
|
// Fs represents a remote to Storj
|
|
type Fs struct {
|
|
name string // the name of the remote
|
|
root string // root of the filesystem
|
|
|
|
opts Options // parsed options
|
|
features *fs.Features // optional features
|
|
|
|
access *uplink.Access // parsed scope
|
|
|
|
project *uplink.Project // project client
|
|
}
|
|
|
|
// Check the interfaces are satisfied.
|
|
var (
|
|
_ fs.Fs = &Fs{}
|
|
_ fs.ListRer = &Fs{}
|
|
_ fs.PutStreamer = &Fs{}
|
|
_ fs.Mover = &Fs{}
|
|
_ fs.Copier = &Fs{}
|
|
_ fs.Purger = &Fs{}
|
|
_ fs.PublicLinker = &Fs{}
|
|
)
|
|
|
|
// NewFs creates a filesystem backed by Storj.
|
|
func NewFs(ctx context.Context, name, root string, m configmap.Mapper) (_ fs.Fs, err error) {
|
|
// Setup filesystem and connection to Storj
|
|
root = norm.NFC.String(root)
|
|
root = strings.Trim(root, "/")
|
|
|
|
f := &Fs{
|
|
name: name,
|
|
root: root,
|
|
}
|
|
|
|
// Parse config into Options struct
|
|
err = configstruct.Set(m, &f.opts)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Parse access
|
|
var access *uplink.Access
|
|
|
|
if f.opts.Access != "" {
|
|
access, err = uplink.ParseAccess(f.opts.Access)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("storj: access: %w", err)
|
|
}
|
|
}
|
|
|
|
if access == nil && f.opts.SatelliteAddress != "" && f.opts.APIKey != "" && f.opts.Passphrase != "" {
|
|
access, err = uplink.RequestAccessWithPassphrase(ctx, f.opts.SatelliteAddress, f.opts.APIKey, f.opts.Passphrase)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("storj: access: %w", err)
|
|
}
|
|
|
|
serializedAccess, err := access.Serialize()
|
|
if err != nil {
|
|
return nil, fmt.Errorf("storj: access: %w", err)
|
|
}
|
|
|
|
err = config.SetValueAndSave(f.name, "access_grant", serializedAccess)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("storj: access: %w", err)
|
|
}
|
|
}
|
|
|
|
if access == nil {
|
|
return nil, errors.New("access not found")
|
|
}
|
|
|
|
f.access = access
|
|
|
|
f.features = (&fs.Features{
|
|
BucketBased: true,
|
|
BucketBasedRootOK: true,
|
|
}).Fill(ctx, f)
|
|
|
|
project, err := f.connect(ctx)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
f.project = project
|
|
|
|
// Root validation needs to check the following: If a bucket path is
|
|
// specified and exists, then the object must be a directory.
|
|
//
|
|
// NOTE: At this point this must return the filesystem object we've
|
|
// created so far even if there is an error.
|
|
if root != "" {
|
|
bucketName, bucketPath := bucket.Split(root)
|
|
|
|
if bucketName != "" && bucketPath != "" {
|
|
_, err = project.StatBucket(ctx, bucketName)
|
|
if err != nil {
|
|
return f, fmt.Errorf("storj: bucket: %w", err)
|
|
}
|
|
|
|
object, err := project.StatObject(ctx, bucketName, bucketPath)
|
|
if err == nil {
|
|
if !object.IsPrefix {
|
|
// If the root is actually a file we
|
|
// need to return the *parent*
|
|
// directory of the root instead and an
|
|
// error that the original root
|
|
// requested is a file.
|
|
newRoot := path.Dir(f.root)
|
|
if newRoot == "." {
|
|
newRoot = ""
|
|
}
|
|
f.root = newRoot
|
|
|
|
return f, fs.ErrorIsFile
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
return f, nil
|
|
}
|
|
|
|
// connect opens a connection to Storj.
|
|
func (f *Fs) connect(ctx context.Context) (project *uplink.Project, err error) {
|
|
fs.Debugf(f, "connecting...")
|
|
defer fs.Debugf(f, "connected: %+v", err)
|
|
|
|
cfg := uplink.Config{
|
|
UserAgent: "rclone",
|
|
}
|
|
|
|
project, err = cfg.OpenProject(ctx, f.access)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("storj: project: %w", err)
|
|
}
|
|
|
|
return
|
|
}
|
|
|
|
// absolute computes the absolute bucket name and path from the filesystem root
|
|
// and the relative path provided.
|
|
func (f *Fs) absolute(relative string) (bucketName, bucketPath string) {
|
|
bn, bp := bucket.Split(path.Join(f.root, relative))
|
|
|
|
// NOTE: Technically libuplink does not care about the encoding. It is
|
|
// happy to work with them as opaque byte sequences. However, rclone
|
|
// has a test that requires two paths with the same normalized form
|
|
// (but different un-normalized forms) to point to the same file. This
|
|
// means we have to normalize before we interact with libuplink.
|
|
return norm.NFC.String(bn), norm.NFC.String(bp)
|
|
}
|
|
|
|
// Name of the remote (as passed into NewFs)
|
|
func (f *Fs) Name() string {
|
|
return f.name
|
|
}
|
|
|
|
// Root of the remote (as passed into NewFs)
|
|
func (f *Fs) Root() string {
|
|
return f.root
|
|
}
|
|
|
|
// String returns a description of the FS
|
|
func (f *Fs) String() string {
|
|
return fmt.Sprintf("FS sj://%s", f.root)
|
|
}
|
|
|
|
// Precision of the ModTimes in this Fs
|
|
func (f *Fs) Precision() time.Duration {
|
|
return time.Nanosecond
|
|
}
|
|
|
|
// Hashes returns the supported hash types of the filesystem.
|
|
func (f *Fs) Hashes() hash.Set {
|
|
return hash.NewHashSet()
|
|
}
|
|
|
|
// Features returns the optional features of this Fs
|
|
func (f *Fs) Features() *fs.Features {
|
|
return f.features
|
|
}
|
|
|
|
// List the objects and directories in relative into entries. The entries can
|
|
// be returned in any order but should be for a complete directory.
|
|
//
|
|
// relative should be "" to list the root, and should not have trailing
|
|
// slashes.
|
|
//
|
|
// This should return fs.ErrDirNotFound if the directory isn't found.
|
|
func (f *Fs) List(ctx context.Context, relative string) (entries fs.DirEntries, err error) {
|
|
fs.Debugf(f, "ls ./%s", relative)
|
|
|
|
bucketName, bucketPath := f.absolute(relative)
|
|
|
|
defer func() {
|
|
if errors.Is(err, uplink.ErrBucketNotFound) {
|
|
err = fs.ErrorDirNotFound
|
|
}
|
|
}()
|
|
|
|
if bucketName == "" {
|
|
if bucketPath != "" {
|
|
return nil, fs.ErrorListBucketRequired
|
|
}
|
|
|
|
return f.listBuckets(ctx)
|
|
}
|
|
|
|
return f.listObjects(ctx, relative, bucketName, bucketPath)
|
|
}
|
|
|
|
func (f *Fs) listBuckets(ctx context.Context) (entries fs.DirEntries, err error) {
|
|
fs.Debugf(f, "BKT ls")
|
|
|
|
buckets := f.project.ListBuckets(ctx, nil)
|
|
|
|
for buckets.Next() {
|
|
bucket := buckets.Item()
|
|
|
|
entries = append(entries, fs.NewDir(bucket.Name, bucket.Created))
|
|
}
|
|
|
|
return entries, buckets.Err()
|
|
}
|
|
|
|
// newDirEntry creates a directory entry from an uplink object.
|
|
//
|
|
// NOTE: Getting the exact behavior required by rclone is somewhat tricky. The
|
|
// path manipulation here is necessary to cover all the different ways the
|
|
// filesystem and object could be initialized and combined.
|
|
func (f *Fs) newDirEntry(relative, prefix string, object *uplink.Object) fs.DirEntry {
|
|
if object.IsPrefix {
|
|
// . The entry must include the relative path as its prefix. Depending on
|
|
// | what is being listed and how the filesystem root was initialized the
|
|
// | relative path may be empty (and so we use path joining here to ensure
|
|
// | we don't end up with an empty path segment).
|
|
// |
|
|
// | . Remove the prefix used during listing.
|
|
// | |
|
|
// | | . Remove the trailing slash.
|
|
// | | |
|
|
// v v v
|
|
return fs.NewDir(path.Join(relative, object.Key[len(prefix):len(object.Key)-1]), object.System.Created)
|
|
}
|
|
|
|
return newObjectFromUplink(f, relative, object)
|
|
}
|
|
|
|
func (f *Fs) listObjects(ctx context.Context, relative, bucketName, bucketPath string) (entries fs.DirEntries, err error) {
|
|
fs.Debugf(f, "OBJ ls ./%s (%q, %q)", relative, bucketName, bucketPath)
|
|
|
|
opts := &uplink.ListObjectsOptions{
|
|
Prefix: newPrefix(bucketPath),
|
|
|
|
System: true,
|
|
Custom: true,
|
|
}
|
|
fs.Debugf(f, "opts %+v", opts)
|
|
|
|
objects := f.project.ListObjects(ctx, bucketName, opts)
|
|
|
|
for objects.Next() {
|
|
entries = append(entries, f.newDirEntry(relative, opts.Prefix, objects.Item()))
|
|
}
|
|
|
|
err = objects.Err()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return entries, nil
|
|
}
|
|
|
|
// ListR lists the objects and directories of the Fs starting from dir
|
|
// recursively into out.
|
|
//
|
|
// relative should be "" to start from the root, and should not have trailing
|
|
// slashes.
|
|
//
|
|
// This should return ErrDirNotFound if the directory isn't found.
|
|
//
|
|
// It should call callback for each tranche of entries read. These need not be
|
|
// returned in any particular order. If callback returns an error then the
|
|
// listing will stop immediately.
|
|
//
|
|
// Don't implement this unless you have a more efficient way of listing
|
|
// recursively that doing a directory traversal.
|
|
func (f *Fs) ListR(ctx context.Context, relative string, callback fs.ListRCallback) (err error) {
|
|
fs.Debugf(f, "ls -R ./%s", relative)
|
|
|
|
bucketName, bucketPath := f.absolute(relative)
|
|
|
|
defer func() {
|
|
if errors.Is(err, uplink.ErrBucketNotFound) {
|
|
err = fs.ErrorDirNotFound
|
|
}
|
|
}()
|
|
|
|
if bucketName == "" {
|
|
if bucketPath != "" {
|
|
return fs.ErrorListBucketRequired
|
|
}
|
|
|
|
return f.listBucketsR(ctx, callback)
|
|
}
|
|
|
|
return f.listObjectsR(ctx, relative, bucketName, bucketPath, callback)
|
|
}
|
|
|
|
func (f *Fs) listBucketsR(ctx context.Context, callback fs.ListRCallback) (err error) {
|
|
fs.Debugf(f, "BKT ls -R")
|
|
|
|
buckets := f.project.ListBuckets(ctx, nil)
|
|
|
|
for buckets.Next() {
|
|
bucket := buckets.Item()
|
|
|
|
err = f.listObjectsR(ctx, bucket.Name, bucket.Name, "", callback)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
return buckets.Err()
|
|
}
|
|
|
|
func (f *Fs) listObjectsR(ctx context.Context, relative, bucketName, bucketPath string, callback fs.ListRCallback) (err error) {
|
|
fs.Debugf(f, "OBJ ls -R ./%s (%q, %q)", relative, bucketName, bucketPath)
|
|
|
|
opts := &uplink.ListObjectsOptions{
|
|
Prefix: newPrefix(bucketPath),
|
|
Recursive: true,
|
|
|
|
System: true,
|
|
Custom: true,
|
|
}
|
|
|
|
objects := f.project.ListObjects(ctx, bucketName, opts)
|
|
|
|
for objects.Next() {
|
|
object := objects.Item()
|
|
|
|
err = callback(fs.DirEntries{f.newDirEntry(relative, opts.Prefix, object)})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
err = objects.Err()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// NewObject finds the Object at relative. If it can't be found it returns the
|
|
// error ErrorObjectNotFound.
|
|
func (f *Fs) NewObject(ctx context.Context, relative string) (_ fs.Object, err error) {
|
|
fs.Debugf(f, "stat ./%s", relative)
|
|
|
|
bucketName, bucketPath := f.absolute(relative)
|
|
|
|
object, err := f.project.StatObject(ctx, bucketName, bucketPath)
|
|
if err != nil {
|
|
fs.Debugf(f, "err: %+v", err)
|
|
|
|
if errors.Is(err, uplink.ErrObjectNotFound) {
|
|
return nil, fs.ErrorObjectNotFound
|
|
}
|
|
return nil, err
|
|
}
|
|
|
|
return newObjectFromUplink(f, relative, object), nil
|
|
}
|
|
|
|
// Put in to the remote path with the modTime given of the given size
|
|
//
|
|
// When called from outside an Fs by rclone, src.Size() will always be >= 0.
|
|
// But for unknown-sized objects (indicated by src.Size() == -1), Put should
|
|
// either return an error or upload it properly (rather than e.g. calling
|
|
// panic).
|
|
//
|
|
// May create the object even if it returns an error - if so will return the
|
|
// object and the error, otherwise will return nil and the error
|
|
func (f *Fs) Put(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (_ fs.Object, err error) {
|
|
return f.put(ctx, in, src, src.Remote(), options...)
|
|
}
|
|
|
|
func (f *Fs) put(ctx context.Context, in io.Reader, src fs.ObjectInfo, remote string, options ...fs.OpenOption) (_ fs.Object, err error) {
|
|
fs.Debugf(f, "cp input ./%s # %+v %d", remote, options, src.Size())
|
|
|
|
// Reject options we don't support.
|
|
for _, option := range options {
|
|
if option.Mandatory() {
|
|
fs.Errorf(f, "Unsupported mandatory option: %v", option)
|
|
|
|
return nil, errors.New("unsupported mandatory option")
|
|
}
|
|
}
|
|
|
|
bucketName, bucketPath := f.absolute(remote)
|
|
|
|
upload, err := f.project.UploadObject(ctx, bucketName, bucketPath, nil)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer func() {
|
|
if err != nil {
|
|
aerr := upload.Abort()
|
|
if aerr != nil && !errors.Is(aerr, uplink.ErrUploadDone) {
|
|
fs.Errorf(f, "cp input ./%s %+v: %+v", remote, options, aerr)
|
|
}
|
|
}
|
|
}()
|
|
|
|
err = upload.SetCustomMetadata(ctx, uplink.CustomMetadata{
|
|
"rclone:mtime": src.ModTime(ctx).Format(time.RFC3339Nano),
|
|
})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
_, err = io.Copy(upload, in)
|
|
if err != nil {
|
|
if errors.Is(err, uplink.ErrBucketNotFound) {
|
|
// Rclone assumes the backend will create the bucket if not existing yet.
|
|
// Here we create the bucket and return a retry error for rclone to retry the upload.
|
|
_, err = f.project.EnsureBucket(ctx, bucketName)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return nil, fserrors.RetryError(errors.New("bucket was not available, now created, the upload must be retried"))
|
|
}
|
|
|
|
err = fserrors.RetryError(err)
|
|
fs.Errorf(f, "cp input ./%s %+v: %+v\n", remote, options, err)
|
|
|
|
return nil, err
|
|
}
|
|
|
|
err = upload.Commit()
|
|
if err != nil {
|
|
if errors.Is(err, uplink.ErrBucketNotFound) {
|
|
// Rclone assumes the backend will create the bucket if not existing yet.
|
|
// Here we create the bucket and return a retry error for rclone to retry the upload.
|
|
_, err = f.project.EnsureBucket(ctx, bucketName)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
err = fserrors.RetryError(errors.New("bucket was not available, now created, the upload must be retried"))
|
|
} else if errors.Is(err, uplink.ErrTooManyRequests) {
|
|
// Storj has a rate limit of 1 per second of uploading to the same file.
|
|
// This produces ErrTooManyRequests here, so we wait 1 second and retry.
|
|
//
|
|
// See: https://github.com/storj/uplink/issues/149
|
|
fs.Debugf(f, "uploading too fast - sleeping for 1 second: %v", err)
|
|
time.Sleep(time.Second)
|
|
err = fserrors.RetryError(err)
|
|
}
|
|
return nil, err
|
|
}
|
|
|
|
return newObjectFromUplink(f, remote, upload.Info()), nil
|
|
}
|
|
|
|
// PutStream uploads to the remote path with the modTime given of indeterminate
|
|
// size.
|
|
//
|
|
// May create the object even if it returns an error - if so will return the
|
|
// object and the error, otherwise will return nil and the error.
|
|
func (f *Fs) PutStream(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (_ fs.Object, err error) {
|
|
return f.Put(ctx, in, src, options...)
|
|
}
|
|
|
|
// Mkdir makes the directory (container, bucket)
|
|
//
|
|
// Shouldn't return an error if it already exists
|
|
func (f *Fs) Mkdir(ctx context.Context, relative string) (err error) {
|
|
fs.Debugf(f, "mkdir -p ./%s", relative)
|
|
|
|
bucketName, _ := f.absolute(relative)
|
|
|
|
_, err = f.project.EnsureBucket(ctx, bucketName)
|
|
|
|
return err
|
|
}
|
|
|
|
// Rmdir removes the directory (container, bucket)
|
|
//
|
|
// NOTE: Despite code documentation to the contrary, this method should not
|
|
// return an error if the directory does not exist.
|
|
func (f *Fs) Rmdir(ctx context.Context, relative string) (err error) {
|
|
fs.Debugf(f, "rmdir ./%s", relative)
|
|
|
|
bucketName, bucketPath := f.absolute(relative)
|
|
|
|
if bucketPath != "" {
|
|
// If we can successfully stat it, then it is an object (and not a prefix).
|
|
_, err := f.project.StatObject(ctx, bucketName, bucketPath)
|
|
if err != nil {
|
|
if errors.Is(err, uplink.ErrObjectNotFound) {
|
|
// At this point we know it is not an object,
|
|
// but we don't know if it is a prefix for one.
|
|
//
|
|
// We check this by doing a listing and if we
|
|
// get any results back, then we know this is a
|
|
// valid prefix (which implies the directory is
|
|
// not empty).
|
|
opts := &uplink.ListObjectsOptions{
|
|
Prefix: newPrefix(bucketPath),
|
|
|
|
System: true,
|
|
Custom: true,
|
|
}
|
|
|
|
objects := f.project.ListObjects(ctx, bucketName, opts)
|
|
|
|
if objects.Next() {
|
|
return fs.ErrorDirectoryNotEmpty
|
|
}
|
|
|
|
return objects.Err()
|
|
}
|
|
|
|
return err
|
|
}
|
|
|
|
return fs.ErrorIsFile
|
|
}
|
|
|
|
_, err = f.project.DeleteBucket(ctx, bucketName)
|
|
if err != nil {
|
|
if errors.Is(err, uplink.ErrBucketNotFound) {
|
|
return fs.ErrorDirNotFound
|
|
}
|
|
|
|
if errors.Is(err, uplink.ErrBucketNotEmpty) {
|
|
return fs.ErrorDirectoryNotEmpty
|
|
}
|
|
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// newPrefix returns a new prefix for listing conforming to the libuplink
|
|
// requirements. In particular, libuplink requires a trailing slash for
|
|
// listings, but rclone does not always provide one. Further, depending on how
|
|
// the path was initially path normalization may have removed it (e.g. a
|
|
// trailing slash from the CLI is removed before it ever gets to the backend
|
|
// code).
|
|
func newPrefix(prefix string) string {
|
|
if prefix == "" {
|
|
return prefix
|
|
}
|
|
|
|
if prefix[len(prefix)-1] == '/' {
|
|
return prefix
|
|
}
|
|
|
|
return prefix + "/"
|
|
}
|
|
|
|
// Move src to this remote using server-side move operations.
|
|
//
|
|
// This is stored with the remote path given.
|
|
//
|
|
// It returns the destination Object and a possible error.
|
|
//
|
|
// Will only be called if src.Fs().Name() == f.Name()
|
|
//
|
|
// If it isn't possible then return fs.ErrorCantMove
|
|
func (f *Fs) Move(ctx context.Context, src fs.Object, remote string) (fs.Object, error) {
|
|
srcObj, ok := src.(*Object)
|
|
if !ok {
|
|
fs.Debugf(src, "Can't move - not same remote type")
|
|
return nil, fs.ErrorCantMove
|
|
}
|
|
|
|
// Move parameters
|
|
srcBucket, srcKey := bucket.Split(srcObj.absolute)
|
|
dstBucket, dstKey := f.absolute(remote)
|
|
options := uplink.MoveObjectOptions{}
|
|
|
|
// Do the move
|
|
err := f.project.MoveObject(ctx, srcBucket, srcKey, dstBucket, dstKey, &options)
|
|
if err != nil {
|
|
// Make sure destination bucket exists
|
|
_, err := f.project.EnsureBucket(ctx, dstBucket)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("rename object failed to create destination bucket: %w", err)
|
|
}
|
|
// And try again
|
|
err = f.project.MoveObject(ctx, srcBucket, srcKey, dstBucket, dstKey, &options)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("rename object failed: %w", err)
|
|
}
|
|
}
|
|
|
|
// Read the new object
|
|
return f.NewObject(ctx, remote)
|
|
}
|
|
|
|
// Copy src to this remote using server-side copy operations.
|
|
//
|
|
// This is stored with the remote path given.
|
|
//
|
|
// It returns the destination Object and a possible error.
|
|
//
|
|
// Will only be called if src.Fs().Name() == f.Name()
|
|
//
|
|
// If it isn't possible then return fs.ErrorCantCopy
|
|
func (f *Fs) Copy(ctx context.Context, src fs.Object, remote string) (fs.Object, error) {
|
|
srcObj, ok := src.(*Object)
|
|
if !ok {
|
|
fs.Debugf(src, "Can't copy - not same remote type")
|
|
return nil, fs.ErrorCantCopy
|
|
}
|
|
|
|
// Copy parameters
|
|
srcBucket, srcKey := bucket.Split(srcObj.absolute)
|
|
dstBucket, dstKey := f.absolute(remote)
|
|
options := uplink.CopyObjectOptions{}
|
|
|
|
// Do the copy
|
|
newObject, err := f.project.CopyObject(ctx, srcBucket, srcKey, dstBucket, dstKey, &options)
|
|
if err != nil {
|
|
// Make sure destination bucket exists
|
|
_, err := f.project.EnsureBucket(ctx, dstBucket)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("copy object failed to create destination bucket: %w", err)
|
|
}
|
|
// And try again
|
|
newObject, err = f.project.CopyObject(ctx, srcBucket, srcKey, dstBucket, dstKey, &options)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("copy object failed: %w", err)
|
|
}
|
|
}
|
|
|
|
// Return the new object
|
|
return newObjectFromUplink(f, remote, newObject), nil
|
|
}
|
|
|
|
// Purge all files in the directory specified
|
|
//
|
|
// Implement this if you have a way of deleting all the files
|
|
// quicker than just running Remove() on the result of List()
|
|
//
|
|
// Return an error if it doesn't exist
|
|
func (f *Fs) Purge(ctx context.Context, dir string) error {
|
|
bucket, directory := f.absolute(dir)
|
|
if bucket == "" {
|
|
return errors.New("can't purge from root")
|
|
}
|
|
|
|
if directory == "" {
|
|
_, err := f.project.DeleteBucketWithObjects(ctx, bucket)
|
|
if errors.Is(err, uplink.ErrBucketNotFound) {
|
|
return fs.ErrorDirNotFound
|
|
}
|
|
return err
|
|
}
|
|
|
|
fs.Infof(directory, "Quick delete is available only for entire bucket. Falling back to list and delete.")
|
|
objects := f.project.ListObjects(ctx, bucket,
|
|
&uplink.ListObjectsOptions{
|
|
Prefix: directory + "/",
|
|
Recursive: true,
|
|
},
|
|
)
|
|
if err := objects.Err(); err != nil {
|
|
return err
|
|
}
|
|
|
|
empty := true
|
|
for objects.Next() {
|
|
empty = false
|
|
_, err := f.project.DeleteObject(ctx, bucket, objects.Item().Key)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
fs.Infof(objects.Item().Key, "Deleted")
|
|
}
|
|
|
|
if empty {
|
|
return fs.ErrorDirNotFound
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// PublicLink generates a public link to the remote path (usually readable by anyone)
|
|
func (f *Fs) PublicLink(ctx context.Context, remote string, expire fs.Duration, unlink bool) (string, error) {
|
|
bucket, key := f.absolute(remote)
|
|
if bucket == "" {
|
|
return "", errors.New("path must be specified")
|
|
}
|
|
|
|
// Rclone requires that a link is only generated if the remote path exists
|
|
if key == "" {
|
|
_, err := f.project.StatBucket(ctx, bucket)
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
} else {
|
|
_, err := f.project.StatObject(ctx, bucket, key)
|
|
if err != nil {
|
|
if !errors.Is(err, uplink.ErrObjectNotFound) {
|
|
return "", err
|
|
}
|
|
// No object found, check if there is such a prefix
|
|
iter := f.project.ListObjects(ctx, bucket, &uplink.ListObjectsOptions{Prefix: key + "/"})
|
|
if iter.Err() != nil {
|
|
return "", iter.Err()
|
|
}
|
|
if !iter.Next() {
|
|
return "", err
|
|
}
|
|
}
|
|
}
|
|
|
|
sharedPrefix := uplink.SharePrefix{Bucket: bucket, Prefix: key}
|
|
|
|
permission := uplink.ReadOnlyPermission()
|
|
if expire.IsSet() {
|
|
permission.NotAfter = time.Now().Add(time.Duration(expire))
|
|
}
|
|
|
|
sharedAccess, err := f.access.Share(permission, sharedPrefix)
|
|
if err != nil {
|
|
return "", fmt.Errorf("sharing access to object failed: %w", err)
|
|
}
|
|
|
|
creds, err := (&edge.Config{
|
|
AuthServiceAddress: "auth.storjshare.io:7777",
|
|
}).RegisterAccess(ctx, sharedAccess, &edge.RegisterAccessOptions{Public: true})
|
|
if err != nil {
|
|
return "", fmt.Errorf("creating public link failed: %w", err)
|
|
}
|
|
|
|
return edge.JoinShareURL("https://link.storjshare.io", creds.AccessKeyID, bucket, key, nil)
|
|
}
|