rclone/fs/open_options.go
Ivan Andreev e65e046c21 local: improve atexit handler for resume
- report errors if any
- prevent double invocation
- prefer saved hash type when many are supported
2021-11-17 20:33:22 +03:00

430 lines
12 KiB
Go

// Options for Open
package fs
import (
"context"
"encoding/json"
"errors"
"fmt"
"net/http"
"os"
"path/filepath"
"sort"
"strconv"
"strings"
"github.com/rclone/rclone/fs/hash"
"github.com/rclone/rclone/lib/cacheroot"
)
// OpenOption is an interface describing options for Open
type OpenOption interface {
fmt.Stringer
// Header returns the option as an HTTP header
Header() (key string, value string)
// Mandatory returns whether this option can be ignored or not
Mandatory() bool
}
// RangeOption defines an HTTP Range option with start and end. If
// either start or end are < 0 then they will be omitted.
//
// End may be bigger than the Size of the object in which case it will
// be capped to the size of the object.
//
// Note that the End is inclusive, so to fetch 100 bytes you would use
// RangeOption{Start: 0, End: 99}
//
// If Start is specified but End is not then it will fetch from Start
// to the end of the file.
//
// If End is specified, but Start is not then it will fetch the last
// End bytes.
//
// Examples:
//
// RangeOption{Start: 0, End: 99} - fetch the first 100 bytes
// RangeOption{Start: 100, End: 199} - fetch the second 100 bytes
// RangeOption{Start: 100, End: -1} - fetch bytes from offset 100 to the end
// RangeOption{Start: -1, End: 100} - fetch the last 100 bytes
//
// A RangeOption implements a single byte-range-spec from
// https://tools.ietf.org/html/rfc7233#section-2.1
type RangeOption struct {
Start int64
End int64
}
// Header formats the option as an http header
func (o *RangeOption) Header() (key string, value string) {
key = "Range"
value = "bytes="
if o.Start >= 0 {
value += strconv.FormatInt(o.Start, 10)
}
value += "-"
if o.End >= 0 {
value += strconv.FormatInt(o.End, 10)
}
return key, value
}
// ParseRangeOption parses a RangeOption from a Range: header.
// It only accepts single ranges.
func ParseRangeOption(s string) (po *RangeOption, err error) {
const preamble = "bytes="
if !strings.HasPrefix(s, preamble) {
return nil, errors.New("Range: header invalid: doesn't start with " + preamble)
}
s = s[len(preamble):]
if strings.IndexRune(s, ',') >= 0 {
return nil, errors.New("Range: header invalid: contains multiple ranges which isn't supported")
}
dash := strings.IndexRune(s, '-')
if dash < 0 {
return nil, errors.New("Range: header invalid: contains no '-'")
}
start, end := strings.TrimSpace(s[:dash]), strings.TrimSpace(s[dash+1:])
o := RangeOption{Start: -1, End: -1}
if start != "" {
o.Start, err = strconv.ParseInt(start, 10, 64)
if err != nil || o.Start < 0 {
return nil, errors.New("Range: header invalid: bad start")
}
}
if end != "" {
o.End, err = strconv.ParseInt(end, 10, 64)
if err != nil || o.End < 0 {
return nil, errors.New("Range: header invalid: bad end")
}
}
return &o, nil
}
// String formats the option into human-readable form
func (o *RangeOption) String() string {
return fmt.Sprintf("RangeOption(%d,%d)", o.Start, o.End)
}
// Mandatory returns whether the option must be parsed or can be ignored
func (o *RangeOption) Mandatory() bool {
return true
}
// Decode interprets the RangeOption into an offset and a limit
//
// The offset is the start of the stream and the limit is how many
// bytes should be read from it. If the limit is -1 then the stream
// should be read to the end.
func (o *RangeOption) Decode(size int64) (offset, limit int64) {
if o.Start >= 0 {
offset = o.Start
if o.End >= 0 {
limit = o.End - o.Start + 1
} else {
limit = -1
}
} else {
if o.End >= 0 {
offset = size - o.End
} else {
offset = 0
}
limit = -1
}
return offset, limit
}
// FixRangeOption looks through the slice of options and adjusts any
// RangeOption~s found that request a fetch from the end into an
// absolute fetch using the size passed in and makes sure the range does
// not exceed filesize. Some remotes (e.g. Onedrive, Box) don't support
// range requests which index from the end.
func FixRangeOption(options []OpenOption, size int64) {
if size == 0 {
// if size 0 then remove RangeOption~s
// replacing with a NullOptions~s which won't be rendered
for i := range options {
if _, ok := options[i].(*RangeOption); ok {
options[i] = NullOption{}
}
}
return
}
for i := range options {
option := options[i]
if x, ok := option.(*RangeOption); ok {
// If start is < 0 then fetch from the end
if x.Start < 0 {
x = &RangeOption{Start: size - x.End, End: -1}
options[i] = x
}
if x.End > size {
x = &RangeOption{Start: x.Start, End: size - 1}
options[i] = x
}
}
}
}
// SeekOption defines an HTTP Range option with start only.
type SeekOption struct {
Offset int64
}
// Header formats the option as an http header
func (o *SeekOption) Header() (key string, value string) {
key = "Range"
value = fmt.Sprintf("bytes=%d-", o.Offset)
return key, value
}
// String formats the option into human-readable form
func (o *SeekOption) String() string {
return fmt.Sprintf("SeekOption(%d)", o.Offset)
}
// Mandatory returns whether the option must be parsed or can be ignored
func (o *SeekOption) Mandatory() bool {
return true
}
// HTTPOption defines a general purpose HTTP option
type HTTPOption struct {
Key string
Value string
}
// Header formats the option as an http header
func (o *HTTPOption) Header() (key string, value string) {
return o.Key, o.Value
}
// String formats the option into human-readable form
func (o *HTTPOption) String() string {
return fmt.Sprintf("HTTPOption(%q,%q)", o.Key, o.Value)
}
// Mandatory returns whether the option must be parsed or can be ignored
func (o *HTTPOption) Mandatory() bool {
return false
}
// HashesOption defines an option used to tell the local fs to limit
// the number of hashes it calculates.
type HashesOption struct {
Hashes hash.Set
}
// Header formats the option as an http header
func (o *HashesOption) Header() (key string, value string) {
return "", ""
}
// String formats the option into human-readable form
func (o *HashesOption) String() string {
return fmt.Sprintf("HashesOption(%v)", o.Hashes)
}
// Mandatory returns whether the option must be parsed or can be ignored
func (o *HashesOption) Mandatory() bool {
return false
}
// OptionResume defines a Put/Upload for doing resumes
type OptionResume struct {
ID string // resume this ID if set
Pos int64 // and resume from this position
Hash string
Src Object
F Fs
Remote string
CacheCleaned bool
CacheDir string
}
// SetID will be called by backend's Put/Update function if the object's upload
// could be resumed upon failure
//
// SetID takes the passed resume ID, hash state, hash name and Fingerprint of the object and stores it in
// --cache-dir so that future Copy operations can resume the upload if it fails
func (o *OptionResume) SetID(ctx context.Context, ID, hashName, hashState string) error {
ci := GetConfig(ctx)
// Get the Fingerprint of the src object so that future Copy operations can ensure the
// object hasn't changed before resuming an upload
fingerprint := Fingerprint(ctx, o.Src, true)
data, err := marshalResumeJSON(ctx, fingerprint, ID, hashName, hashState)
if err != nil {
return fmt.Errorf("failed to marshal data JSON: %w", err)
}
if len(data) < int(ci.MaxResumeCacheSize) {
// Each remote will have its own directory for cached resume files
dirPath, _, err := cacheroot.CreateCacheRoot(o.CacheDir, o.F.Name(), o.F.Root(), "resume")
if err != nil {
return err
}
err = os.MkdirAll(dirPath, os.ModePerm)
if err != nil {
return fmt.Errorf("failed to create cache directory %v: %w", dirPath, err)
}
// Write resume data to disk
cachePath := filepath.Join(dirPath, o.Remote)
cacheFile, err := os.Create(cachePath)
if err != nil {
return fmt.Errorf("failed to create cache file %v: %w", cachePath, err)
}
defer func() {
_ = cacheFile.Close()
}()
_, errWrite := cacheFile.Write(data)
if errWrite != nil {
return fmt.Errorf("failed to write JSON to file: %w", errWrite)
}
}
if !o.CacheCleaned {
rootCacheDir := filepath.Join(o.CacheDir, "resume")
if err := cleanResumeCache(ctx, rootCacheDir); err != nil {
return fmt.Errorf("failed to clean resume cache: %w", err)
}
}
o.CacheCleaned = true
return nil
}
// ResumeJSON is a struct for storing resume info in cache
type ResumeJSON struct {
Fingerprint string `json:"fprint"`
ID string `json:"id"`
HashName string `json:"hname"`
HashState string `json:"hstate"`
}
func marshalResumeJSON(ctx context.Context, fprint, id, hashName, hashState string) ([]byte, error) {
resumedata := ResumeJSON{
Fingerprint: fprint,
ID: id,
HashName: hashName,
HashState: hashState,
}
data, err := json.Marshal(&resumedata)
return data, err
}
// cleanCache checks the size of the resume cache and removes the oldest resume files if more than limit
func cleanResumeCache(ctx context.Context, rootCacheDir string) error {
ci := GetConfig(ctx)
var paths []string
pathsWithInfo := make(map[string]os.FileInfo)
totalCacheSize := int64(0)
walkErr := filepath.Walk(rootCacheDir,
func(path string, info os.FileInfo, err error) error {
if err != nil {
return err
}
if info.IsDir() {
// Empty subdirectories in the resume cache dir can be removed
removeErr := os.Remove(path)
if err != nil && !os.IsNotExist(removeErr) {
return fmt.Errorf("failed to remove empty subdirectory: %s: %w", path, err)
}
return nil
}
paths = append(paths, path)
pathsWithInfo[path] = info
totalCacheSize += info.Size()
return nil
})
if walkErr != nil {
return fmt.Errorf("error walking through cache when cleaning cache dir: %w", walkErr)
}
if totalCacheSize > int64(ci.MaxResumeCacheSize) {
sort.Slice(paths, func(i, j int) bool {
return pathsWithInfo[paths[i]].ModTime().Before(pathsWithInfo[paths[j]].ModTime())
})
for _, p := range paths {
if totalCacheSize < int64(ci.MaxResumeCacheSize) {
break
}
if err := os.Remove(p); err != nil {
return fmt.Errorf("error removing oldest cache file: %s: %w", p, err)
}
totalCacheSize -= pathsWithInfo[p].Size()
Debugf(p, "Successfully removed oldest cache file")
}
}
return nil
}
// Header formats the option as an http header
func (o *OptionResume) Header() (key string, value string) {
return "", ""
}
// String formats the option into human readable form
func (o *OptionResume) String() string {
return fmt.Sprintf("OptionResume(ID:%v, Pos:%v)", o.ID, o.Pos)
}
// Mandatory returns whether the option must be parsed or can be ignored
func (o *OptionResume) Mandatory() bool {
return false
}
// NullOption defines an Option which does nothing
type NullOption struct {
}
// Header formats the option as an http header
func (o NullOption) Header() (key string, value string) {
return "", ""
}
// String formats the option into human-readable form
func (o NullOption) String() string {
return fmt.Sprintf("NullOption()")
}
// Mandatory returns whether the option must be parsed or can be ignored
func (o NullOption) Mandatory() bool {
return false
}
// OpenOptionAddHeaders adds each header found in options to the
// headers map provided the key was non empty.
func OpenOptionAddHeaders(options []OpenOption, headers map[string]string) {
for _, option := range options {
key, value := option.Header()
if key != "" && value != "" {
headers[key] = value
}
}
}
// OpenOptionHeaders adds each header found in options to the
// headers map provided the key was non empty.
//
// It returns a nil map if options was empty
func OpenOptionHeaders(options []OpenOption) (headers map[string]string) {
if len(options) == 0 {
return nil
}
headers = make(map[string]string, len(options))
OpenOptionAddHeaders(options, headers)
return headers
}
// OpenOptionAddHTTPHeaders Sets each header found in options to the
// http.Header map provided the key was non empty.
func OpenOptionAddHTTPHeaders(headers http.Header, options []OpenOption) {
for _, option := range options {
key, value := option.Header()
if key != "" && value != "" {
headers.Set(key, value)
}
}
}