forked from TrueCloudLab/rclone
- report errors if any - prevent double invocation - prefer saved hash type when many are supported
430 lines
12 KiB
Go
430 lines
12 KiB
Go
// Options for Open
|
|
|
|
package fs
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"net/http"
|
|
"os"
|
|
"path/filepath"
|
|
"sort"
|
|
"strconv"
|
|
"strings"
|
|
|
|
"github.com/rclone/rclone/fs/hash"
|
|
"github.com/rclone/rclone/lib/cacheroot"
|
|
)
|
|
|
|
// OpenOption is an interface describing options for Open
|
|
type OpenOption interface {
|
|
fmt.Stringer
|
|
|
|
// Header returns the option as an HTTP header
|
|
Header() (key string, value string)
|
|
|
|
// Mandatory returns whether this option can be ignored or not
|
|
Mandatory() bool
|
|
}
|
|
|
|
// RangeOption defines an HTTP Range option with start and end. If
|
|
// either start or end are < 0 then they will be omitted.
|
|
//
|
|
// End may be bigger than the Size of the object in which case it will
|
|
// be capped to the size of the object.
|
|
//
|
|
// Note that the End is inclusive, so to fetch 100 bytes you would use
|
|
// RangeOption{Start: 0, End: 99}
|
|
//
|
|
// If Start is specified but End is not then it will fetch from Start
|
|
// to the end of the file.
|
|
//
|
|
// If End is specified, but Start is not then it will fetch the last
|
|
// End bytes.
|
|
//
|
|
// Examples:
|
|
//
|
|
// RangeOption{Start: 0, End: 99} - fetch the first 100 bytes
|
|
// RangeOption{Start: 100, End: 199} - fetch the second 100 bytes
|
|
// RangeOption{Start: 100, End: -1} - fetch bytes from offset 100 to the end
|
|
// RangeOption{Start: -1, End: 100} - fetch the last 100 bytes
|
|
//
|
|
// A RangeOption implements a single byte-range-spec from
|
|
// https://tools.ietf.org/html/rfc7233#section-2.1
|
|
type RangeOption struct {
|
|
Start int64
|
|
End int64
|
|
}
|
|
|
|
// Header formats the option as an http header
|
|
func (o *RangeOption) Header() (key string, value string) {
|
|
key = "Range"
|
|
value = "bytes="
|
|
if o.Start >= 0 {
|
|
value += strconv.FormatInt(o.Start, 10)
|
|
|
|
}
|
|
value += "-"
|
|
if o.End >= 0 {
|
|
value += strconv.FormatInt(o.End, 10)
|
|
}
|
|
return key, value
|
|
}
|
|
|
|
// ParseRangeOption parses a RangeOption from a Range: header.
|
|
// It only accepts single ranges.
|
|
func ParseRangeOption(s string) (po *RangeOption, err error) {
|
|
const preamble = "bytes="
|
|
if !strings.HasPrefix(s, preamble) {
|
|
return nil, errors.New("Range: header invalid: doesn't start with " + preamble)
|
|
}
|
|
s = s[len(preamble):]
|
|
if strings.IndexRune(s, ',') >= 0 {
|
|
return nil, errors.New("Range: header invalid: contains multiple ranges which isn't supported")
|
|
}
|
|
dash := strings.IndexRune(s, '-')
|
|
if dash < 0 {
|
|
return nil, errors.New("Range: header invalid: contains no '-'")
|
|
}
|
|
start, end := strings.TrimSpace(s[:dash]), strings.TrimSpace(s[dash+1:])
|
|
o := RangeOption{Start: -1, End: -1}
|
|
if start != "" {
|
|
o.Start, err = strconv.ParseInt(start, 10, 64)
|
|
if err != nil || o.Start < 0 {
|
|
return nil, errors.New("Range: header invalid: bad start")
|
|
}
|
|
}
|
|
if end != "" {
|
|
o.End, err = strconv.ParseInt(end, 10, 64)
|
|
if err != nil || o.End < 0 {
|
|
return nil, errors.New("Range: header invalid: bad end")
|
|
}
|
|
}
|
|
return &o, nil
|
|
}
|
|
|
|
// String formats the option into human-readable form
|
|
func (o *RangeOption) String() string {
|
|
return fmt.Sprintf("RangeOption(%d,%d)", o.Start, o.End)
|
|
}
|
|
|
|
// Mandatory returns whether the option must be parsed or can be ignored
|
|
func (o *RangeOption) Mandatory() bool {
|
|
return true
|
|
}
|
|
|
|
// Decode interprets the RangeOption into an offset and a limit
|
|
//
|
|
// The offset is the start of the stream and the limit is how many
|
|
// bytes should be read from it. If the limit is -1 then the stream
|
|
// should be read to the end.
|
|
func (o *RangeOption) Decode(size int64) (offset, limit int64) {
|
|
if o.Start >= 0 {
|
|
offset = o.Start
|
|
if o.End >= 0 {
|
|
limit = o.End - o.Start + 1
|
|
} else {
|
|
limit = -1
|
|
}
|
|
} else {
|
|
if o.End >= 0 {
|
|
offset = size - o.End
|
|
} else {
|
|
offset = 0
|
|
}
|
|
limit = -1
|
|
}
|
|
return offset, limit
|
|
}
|
|
|
|
// FixRangeOption looks through the slice of options and adjusts any
|
|
// RangeOption~s found that request a fetch from the end into an
|
|
// absolute fetch using the size passed in and makes sure the range does
|
|
// not exceed filesize. Some remotes (e.g. Onedrive, Box) don't support
|
|
// range requests which index from the end.
|
|
func FixRangeOption(options []OpenOption, size int64) {
|
|
if size == 0 {
|
|
// if size 0 then remove RangeOption~s
|
|
// replacing with a NullOptions~s which won't be rendered
|
|
for i := range options {
|
|
if _, ok := options[i].(*RangeOption); ok {
|
|
options[i] = NullOption{}
|
|
|
|
}
|
|
}
|
|
return
|
|
}
|
|
for i := range options {
|
|
option := options[i]
|
|
if x, ok := option.(*RangeOption); ok {
|
|
// If start is < 0 then fetch from the end
|
|
if x.Start < 0 {
|
|
x = &RangeOption{Start: size - x.End, End: -1}
|
|
options[i] = x
|
|
}
|
|
if x.End > size {
|
|
x = &RangeOption{Start: x.Start, End: size - 1}
|
|
options[i] = x
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// SeekOption defines an HTTP Range option with start only.
|
|
type SeekOption struct {
|
|
Offset int64
|
|
}
|
|
|
|
// Header formats the option as an http header
|
|
func (o *SeekOption) Header() (key string, value string) {
|
|
key = "Range"
|
|
value = fmt.Sprintf("bytes=%d-", o.Offset)
|
|
return key, value
|
|
}
|
|
|
|
// String formats the option into human-readable form
|
|
func (o *SeekOption) String() string {
|
|
return fmt.Sprintf("SeekOption(%d)", o.Offset)
|
|
}
|
|
|
|
// Mandatory returns whether the option must be parsed or can be ignored
|
|
func (o *SeekOption) Mandatory() bool {
|
|
return true
|
|
}
|
|
|
|
// HTTPOption defines a general purpose HTTP option
|
|
type HTTPOption struct {
|
|
Key string
|
|
Value string
|
|
}
|
|
|
|
// Header formats the option as an http header
|
|
func (o *HTTPOption) Header() (key string, value string) {
|
|
return o.Key, o.Value
|
|
}
|
|
|
|
// String formats the option into human-readable form
|
|
func (o *HTTPOption) String() string {
|
|
return fmt.Sprintf("HTTPOption(%q,%q)", o.Key, o.Value)
|
|
}
|
|
|
|
// Mandatory returns whether the option must be parsed or can be ignored
|
|
func (o *HTTPOption) Mandatory() bool {
|
|
return false
|
|
}
|
|
|
|
// HashesOption defines an option used to tell the local fs to limit
|
|
// the number of hashes it calculates.
|
|
type HashesOption struct {
|
|
Hashes hash.Set
|
|
}
|
|
|
|
// Header formats the option as an http header
|
|
func (o *HashesOption) Header() (key string, value string) {
|
|
return "", ""
|
|
}
|
|
|
|
// String formats the option into human-readable form
|
|
func (o *HashesOption) String() string {
|
|
return fmt.Sprintf("HashesOption(%v)", o.Hashes)
|
|
}
|
|
|
|
// Mandatory returns whether the option must be parsed or can be ignored
|
|
func (o *HashesOption) Mandatory() bool {
|
|
return false
|
|
}
|
|
|
|
// OptionResume defines a Put/Upload for doing resumes
|
|
type OptionResume struct {
|
|
ID string // resume this ID if set
|
|
Pos int64 // and resume from this position
|
|
Hash string
|
|
Src Object
|
|
F Fs
|
|
Remote string
|
|
CacheCleaned bool
|
|
CacheDir string
|
|
}
|
|
|
|
// SetID will be called by backend's Put/Update function if the object's upload
|
|
// could be resumed upon failure
|
|
//
|
|
// SetID takes the passed resume ID, hash state, hash name and Fingerprint of the object and stores it in
|
|
// --cache-dir so that future Copy operations can resume the upload if it fails
|
|
func (o *OptionResume) SetID(ctx context.Context, ID, hashName, hashState string) error {
|
|
ci := GetConfig(ctx)
|
|
// Get the Fingerprint of the src object so that future Copy operations can ensure the
|
|
// object hasn't changed before resuming an upload
|
|
fingerprint := Fingerprint(ctx, o.Src, true)
|
|
data, err := marshalResumeJSON(ctx, fingerprint, ID, hashName, hashState)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to marshal data JSON: %w", err)
|
|
}
|
|
if len(data) < int(ci.MaxResumeCacheSize) {
|
|
// Each remote will have its own directory for cached resume files
|
|
dirPath, _, err := cacheroot.CreateCacheRoot(o.CacheDir, o.F.Name(), o.F.Root(), "resume")
|
|
if err != nil {
|
|
return err
|
|
}
|
|
err = os.MkdirAll(dirPath, os.ModePerm)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to create cache directory %v: %w", dirPath, err)
|
|
}
|
|
// Write resume data to disk
|
|
cachePath := filepath.Join(dirPath, o.Remote)
|
|
cacheFile, err := os.Create(cachePath)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to create cache file %v: %w", cachePath, err)
|
|
}
|
|
defer func() {
|
|
_ = cacheFile.Close()
|
|
}()
|
|
_, errWrite := cacheFile.Write(data)
|
|
if errWrite != nil {
|
|
return fmt.Errorf("failed to write JSON to file: %w", errWrite)
|
|
}
|
|
}
|
|
if !o.CacheCleaned {
|
|
rootCacheDir := filepath.Join(o.CacheDir, "resume")
|
|
if err := cleanResumeCache(ctx, rootCacheDir); err != nil {
|
|
return fmt.Errorf("failed to clean resume cache: %w", err)
|
|
}
|
|
}
|
|
o.CacheCleaned = true
|
|
return nil
|
|
}
|
|
|
|
// ResumeJSON is a struct for storing resume info in cache
|
|
type ResumeJSON struct {
|
|
Fingerprint string `json:"fprint"`
|
|
ID string `json:"id"`
|
|
HashName string `json:"hname"`
|
|
HashState string `json:"hstate"`
|
|
}
|
|
|
|
func marshalResumeJSON(ctx context.Context, fprint, id, hashName, hashState string) ([]byte, error) {
|
|
resumedata := ResumeJSON{
|
|
Fingerprint: fprint,
|
|
ID: id,
|
|
HashName: hashName,
|
|
HashState: hashState,
|
|
}
|
|
data, err := json.Marshal(&resumedata)
|
|
return data, err
|
|
}
|
|
|
|
// cleanCache checks the size of the resume cache and removes the oldest resume files if more than limit
|
|
func cleanResumeCache(ctx context.Context, rootCacheDir string) error {
|
|
ci := GetConfig(ctx)
|
|
var paths []string
|
|
pathsWithInfo := make(map[string]os.FileInfo)
|
|
totalCacheSize := int64(0)
|
|
walkErr := filepath.Walk(rootCacheDir,
|
|
func(path string, info os.FileInfo, err error) error {
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if info.IsDir() {
|
|
// Empty subdirectories in the resume cache dir can be removed
|
|
removeErr := os.Remove(path)
|
|
if err != nil && !os.IsNotExist(removeErr) {
|
|
return fmt.Errorf("failed to remove empty subdirectory: %s: %w", path, err)
|
|
}
|
|
return nil
|
|
}
|
|
paths = append(paths, path)
|
|
pathsWithInfo[path] = info
|
|
totalCacheSize += info.Size()
|
|
return nil
|
|
})
|
|
if walkErr != nil {
|
|
return fmt.Errorf("error walking through cache when cleaning cache dir: %w", walkErr)
|
|
}
|
|
if totalCacheSize > int64(ci.MaxResumeCacheSize) {
|
|
sort.Slice(paths, func(i, j int) bool {
|
|
return pathsWithInfo[paths[i]].ModTime().Before(pathsWithInfo[paths[j]].ModTime())
|
|
})
|
|
for _, p := range paths {
|
|
if totalCacheSize < int64(ci.MaxResumeCacheSize) {
|
|
break
|
|
}
|
|
if err := os.Remove(p); err != nil {
|
|
return fmt.Errorf("error removing oldest cache file: %s: %w", p, err)
|
|
}
|
|
totalCacheSize -= pathsWithInfo[p].Size()
|
|
Debugf(p, "Successfully removed oldest cache file")
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Header formats the option as an http header
|
|
func (o *OptionResume) Header() (key string, value string) {
|
|
return "", ""
|
|
}
|
|
|
|
// String formats the option into human readable form
|
|
func (o *OptionResume) String() string {
|
|
return fmt.Sprintf("OptionResume(ID:%v, Pos:%v)", o.ID, o.Pos)
|
|
}
|
|
|
|
// Mandatory returns whether the option must be parsed or can be ignored
|
|
func (o *OptionResume) Mandatory() bool {
|
|
return false
|
|
}
|
|
|
|
// NullOption defines an Option which does nothing
|
|
type NullOption struct {
|
|
}
|
|
|
|
// Header formats the option as an http header
|
|
func (o NullOption) Header() (key string, value string) {
|
|
return "", ""
|
|
}
|
|
|
|
// String formats the option into human-readable form
|
|
func (o NullOption) String() string {
|
|
return fmt.Sprintf("NullOption()")
|
|
}
|
|
|
|
// Mandatory returns whether the option must be parsed or can be ignored
|
|
func (o NullOption) Mandatory() bool {
|
|
return false
|
|
}
|
|
|
|
// OpenOptionAddHeaders adds each header found in options to the
|
|
// headers map provided the key was non empty.
|
|
func OpenOptionAddHeaders(options []OpenOption, headers map[string]string) {
|
|
for _, option := range options {
|
|
key, value := option.Header()
|
|
if key != "" && value != "" {
|
|
headers[key] = value
|
|
}
|
|
}
|
|
}
|
|
|
|
// OpenOptionHeaders adds each header found in options to the
|
|
// headers map provided the key was non empty.
|
|
//
|
|
// It returns a nil map if options was empty
|
|
func OpenOptionHeaders(options []OpenOption) (headers map[string]string) {
|
|
if len(options) == 0 {
|
|
return nil
|
|
}
|
|
headers = make(map[string]string, len(options))
|
|
OpenOptionAddHeaders(options, headers)
|
|
return headers
|
|
}
|
|
|
|
// OpenOptionAddHTTPHeaders Sets each header found in options to the
|
|
// http.Header map provided the key was non empty.
|
|
func OpenOptionAddHTTPHeaders(headers http.Header, options []OpenOption) {
|
|
for _, option := range options {
|
|
key, value := option.Header()
|
|
if key != "" && value != "" {
|
|
headers.Set(key, value)
|
|
}
|
|
}
|
|
}
|