forked from TrueCloudLab/rclone
swift: implement --swift-use-segments-container to allow >5G files on Blomp
This switches between storing chunks in a separate container suffixed with `_segments` (the default) and a directory in the root `.file-segments`) By default the `.file-segments` mode will be auto selected if `auth_url`s that require it are detected. If the `.file-segments` mode is in use then rclone will omit that directory from listings. See: https://forum.rclone.org/t/blomp-unable-to-upload-5gb-files/42498/
This commit is contained in:
parent
8a18c29835
commit
5bf70c68f1
3 changed files with 225 additions and 202 deletions
|
@ -9,11 +9,12 @@ import (
|
|||
"fmt"
|
||||
"io"
|
||||
"path"
|
||||
"regexp"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/google/uuid"
|
||||
"github.com/ncw/swift/v2"
|
||||
"github.com/rclone/rclone/fs"
|
||||
"github.com/rclone/rclone/fs/config"
|
||||
|
@ -28,6 +29,7 @@ import (
|
|||
"github.com/rclone/rclone/lib/bucket"
|
||||
"github.com/rclone/rclone/lib/encoder"
|
||||
"github.com/rclone/rclone/lib/pacer"
|
||||
"github.com/rclone/rclone/lib/random"
|
||||
"github.com/rclone/rclone/lib/readers"
|
||||
)
|
||||
|
||||
|
@ -37,29 +39,42 @@ const (
|
|||
listChunks = 1000 // chunk size to read directory listings
|
||||
defaultChunkSize = 5 * fs.Gibi
|
||||
minSleep = 10 * time.Millisecond // In case of error, start at 10ms sleep.
|
||||
segmentsContainerSuffix = "_segments"
|
||||
segmentsDirectory = ".file-segments"
|
||||
segmentsDirectorySlash = segmentsDirectory + "/"
|
||||
)
|
||||
|
||||
// Auth URLs which imply using fileSegmentsDirectory
|
||||
var needFileSegmentsDirectory = regexp.MustCompile(`(?s)\.(ain?\.net|blomp\.com|praetector\.com|signmy\.name|rackfactory\.com)($|/)`)
|
||||
|
||||
// SharedOptions are shared between swift and backends which depend on swift
|
||||
var SharedOptions = []fs.Option{{
|
||||
Name: "chunk_size",
|
||||
Help: `Above this size files will be chunked into a _segments container.
|
||||
Help: strings.ReplaceAll(`Above this size files will be chunked.
|
||||
|
||||
Above this size files will be chunked into a _segments container. The
|
||||
default for this is 5 GiB which is its maximum value.`,
|
||||
Above this size files will be chunked into a a |`+segmentsContainerSuffix+`| container
|
||||
or a |`+segmentsDirectory+`| directory. (See the |use_segments_container| option
|
||||
for more info). Default for this is 5 GiB which is its maximum value, which
|
||||
means only files above this size will be chunked.
|
||||
|
||||
Rclone uploads chunked files as dynamic large objects (DLO).
|
||||
`, "|", "`"),
|
||||
Default: defaultChunkSize,
|
||||
Advanced: true,
|
||||
}, {
|
||||
Name: "no_chunk",
|
||||
Help: `Don't chunk files during streaming upload.
|
||||
Help: strings.ReplaceAll(`Don't chunk files during streaming upload.
|
||||
|
||||
When doing streaming uploads (e.g. using rcat or mount) setting this
|
||||
flag will cause the swift backend to not upload chunked files.
|
||||
When doing streaming uploads (e.g. using |rcat| or |mount| with
|
||||
|--vfs-cache-mode off|) setting this flag will cause the swift backend
|
||||
to not upload chunked files.
|
||||
|
||||
This will limit the maximum upload size to 5 GiB. However non chunked
|
||||
files are easier to deal with and have an MD5SUM.
|
||||
This will limit the maximum streamed upload size to 5 GiB. This is
|
||||
useful because non chunked files are easier to deal with and have an
|
||||
MD5SUM.
|
||||
|
||||
Rclone will still chunk files bigger than chunk_size when doing normal
|
||||
copy operations.`,
|
||||
Rclone will still chunk files bigger than |chunk_size| when doing
|
||||
normal copy operations.`, "|", "`"),
|
||||
Default: false,
|
||||
Advanced: true,
|
||||
}, {
|
||||
|
@ -67,11 +82,12 @@ copy operations.`,
|
|||
Help: strings.ReplaceAll(`Disable support for static and dynamic large objects
|
||||
|
||||
Swift cannot transparently store files bigger than 5 GiB. There are
|
||||
two schemes for doing that, static or dynamic large objects, and the
|
||||
API does not allow rclone to determine whether a file is a static or
|
||||
dynamic large object without doing a HEAD on the object. Since these
|
||||
need to be treated differently, this means rclone has to issue HEAD
|
||||
requests for objects for example when reading checksums.
|
||||
two schemes for chunking large files, static large objects (SLO) or
|
||||
dynamic large objects (DLO), and the API does not allow rclone to
|
||||
determine whether a file is a static or dynamic large object without
|
||||
doing a HEAD on the object. Since these need to be treated
|
||||
differently, this means rclone has to issue HEAD requests for objects
|
||||
for example when reading checksums.
|
||||
|
||||
When |no_large_objects| is set, rclone will assume that there are no
|
||||
static or dynamic large objects stored. This means it can stop doing
|
||||
|
@ -82,12 +98,40 @@ Setting this option implies |no_chunk| and also that no files will be
|
|||
uploaded in chunks, so files bigger than 5 GiB will just fail on
|
||||
upload.
|
||||
|
||||
If you set this option and there *are* static or dynamic large objects,
|
||||
If you set this option and there **are** static or dynamic large objects,
|
||||
then this will give incorrect hashes for them. Downloads will succeed,
|
||||
but other operations such as Remove and Copy will fail.
|
||||
`, "|", "`"),
|
||||
Default: false,
|
||||
Advanced: true,
|
||||
}, {
|
||||
Name: "use_segments_container",
|
||||
Help: strings.ReplaceAll(`Choose destination for large object segments
|
||||
|
||||
Swift cannot transparently store files bigger than 5 GiB and rclone
|
||||
will chunk files larger than |chunk_size| (default 5 GiB) in order to
|
||||
upload them.
|
||||
|
||||
If this value is |true| the chunks will be stored in an additional
|
||||
container named the same as the destination container but with
|
||||
|`+segmentsContainerSuffix+`| appended. This means that there won't be any duplicated
|
||||
data in the original container but having another container may not be
|
||||
acceptable.
|
||||
|
||||
If this value is |false| the chunks will be stored in a
|
||||
|`+segmentsDirectory+`| directory in the root of the container. This
|
||||
directory will be omitted when listing the container. Some
|
||||
providers (eg Blomp) require this mode as creating additional
|
||||
containers isn't allowed. If it is desired to see the |`+segmentsDirectory+`|
|
||||
directory in the root then this flag must be set to |true|.
|
||||
|
||||
If this value is |unset| (the default), then rclone will choose the value
|
||||
to use. It will be |false| unless rclone detects any |auth_url|s that
|
||||
it knows need it to be |true|. In this case you'll see a message in
|
||||
the DEBUG log.
|
||||
`, "|", "`"),
|
||||
Default: fs.Tristate{},
|
||||
Advanced: true,
|
||||
}, {
|
||||
Name: config.ConfigEncoding,
|
||||
Help: config.ConfigEncodingHelp,
|
||||
|
@ -262,6 +306,7 @@ type Options struct {
|
|||
ChunkSize fs.SizeSuffix `config:"chunk_size"`
|
||||
NoChunk bool `config:"no_chunk"`
|
||||
NoLargeObjects bool `config:"no_large_objects"`
|
||||
UseSegmentsContainer fs.Tristate `config:"use_segments_container"`
|
||||
Enc encoder.MultiEncoder `config:"encoding"`
|
||||
}
|
||||
|
||||
|
@ -506,6 +551,11 @@ func NewFsWithConnection(ctx context.Context, opt *Options, name, root string, c
|
|||
BucketBasedRootOK: true,
|
||||
SlowModTime: true,
|
||||
}).Fill(ctx, f)
|
||||
if !f.opt.UseSegmentsContainer.Valid {
|
||||
f.opt.UseSegmentsContainer.Value = !needFileSegmentsDirectory.MatchString(opt.Auth)
|
||||
f.opt.UseSegmentsContainer.Valid = true
|
||||
fs.Debugf(f, "Auto set use_segments_container to %v", f.opt.UseSegmentsContainer.Value)
|
||||
}
|
||||
if f.rootContainer != "" && f.rootDirectory != "" {
|
||||
// Check to see if the object exists - ignoring directory markers
|
||||
var info swift.Object
|
||||
|
@ -627,6 +677,10 @@ func (f *Fs) listContainerRoot(ctx context.Context, container, directory, prefix
|
|||
if err == nil {
|
||||
for i := range objects {
|
||||
object := &objects[i]
|
||||
if !includeDirMarkers && !f.opt.UseSegmentsContainer.Value && (object.Name == segmentsDirectory || strings.HasPrefix(object.Name, segmentsDirectorySlash)) {
|
||||
// Don't show segments in listing unless showing directory markers
|
||||
continue
|
||||
}
|
||||
isDirectory := false
|
||||
if !recurse {
|
||||
isDirectory = strings.HasSuffix(object.Name, "/")
|
||||
|
@ -965,8 +1019,8 @@ func (f *Fs) Copy(ctx context.Context, src fs.Object, remote string) (fs.Object,
|
|||
return nil, err
|
||||
}
|
||||
if isLargeObject {
|
||||
/*handle large object*/
|
||||
err = copyLargeObject(ctx, f, srcObj, dstContainer, dstPath)
|
||||
// handle large object
|
||||
err = f.copyLargeObject(ctx, srcObj, dstContainer, dstPath)
|
||||
} else {
|
||||
srcContainer, srcPath := srcObj.split()
|
||||
err = f.pacer.Call(func() (bool, error) {
|
||||
|
@ -981,102 +1035,124 @@ func (f *Fs) Copy(ctx context.Context, src fs.Object, remote string) (fs.Object,
|
|||
return f.NewObject(ctx, remote)
|
||||
}
|
||||
|
||||
func copyLargeObject(ctx context.Context, f *Fs, src *Object, dstContainer string, dstPath string) error {
|
||||
segmentsContainer := dstContainer + "_segments"
|
||||
err := f.makeContainer(ctx, segmentsContainer)
|
||||
// Represents a segmented upload or copy
|
||||
type segmentedUpload struct {
|
||||
f *Fs // parent
|
||||
dstContainer string // container for the file to live once uploaded
|
||||
container string // container for the segments
|
||||
dstPath string // path for the object to live once uploaded
|
||||
path string // unique path for the segments
|
||||
mu sync.Mutex // protects the variables below
|
||||
segments []string // segments successfully uploaded
|
||||
}
|
||||
|
||||
// Create a new segmented upload using the correct container and path
|
||||
func (f *Fs) newSegmentedUpload(ctx context.Context, dstContainer string, dstPath string) (su *segmentedUpload, err error) {
|
||||
randomString, err := random.Password(128)
|
||||
if err != nil {
|
||||
return err
|
||||
return nil, fmt.Errorf("failed to create random string for upload: %w", err)
|
||||
}
|
||||
segments, err := src.getSegmentsLargeObject(ctx)
|
||||
uniqueString := time.Now().Format("2006-01-02-150405") + "-" + randomString
|
||||
su = &segmentedUpload{
|
||||
f: f,
|
||||
dstPath: dstPath,
|
||||
path: dstPath + "/" + uniqueString,
|
||||
dstContainer: dstContainer,
|
||||
container: dstContainer,
|
||||
}
|
||||
if f.opt.UseSegmentsContainer.Value {
|
||||
su.container += segmentsContainerSuffix
|
||||
err = f.makeContainer(ctx, su.container)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
} else {
|
||||
su.path = segmentsDirectorySlash + su.path
|
||||
}
|
||||
return su, nil
|
||||
}
|
||||
|
||||
// Return the path of the i-th segment
|
||||
func (su *segmentedUpload) segmentPath(i int) string {
|
||||
return fmt.Sprintf("%s/%08d", su.path, i)
|
||||
}
|
||||
|
||||
// Mark segment as successfully uploaded
|
||||
func (su *segmentedUpload) uploaded(segment string) {
|
||||
su.mu.Lock()
|
||||
defer su.mu.Unlock()
|
||||
su.segments = append(su.segments, segment)
|
||||
}
|
||||
|
||||
// Return the full path including the container
|
||||
func (su *segmentedUpload) fullPath() string {
|
||||
return fmt.Sprintf("%s/%s", su.container, su.path)
|
||||
}
|
||||
|
||||
// Remove segments when upload/copy process fails
|
||||
func (su *segmentedUpload) onFail() {
|
||||
f := su.f
|
||||
if f.opt.LeavePartsOnError {
|
||||
return
|
||||
}
|
||||
ctx := context.Background()
|
||||
fs.Debugf(f, "Segment operation failed: bulk deleting failed segments")
|
||||
if len(su.container) == 0 {
|
||||
fs.Debugf(f, "Invalid segments container")
|
||||
return
|
||||
}
|
||||
if len(su.segments) == 0 {
|
||||
fs.Debugf(f, "No segments to delete")
|
||||
return
|
||||
}
|
||||
_, err := f.c.BulkDelete(ctx, su.container, su.segments)
|
||||
if err != nil {
|
||||
return err
|
||||
fs.Errorf(f, "Failed to bulk delete failed segments: %v", err)
|
||||
}
|
||||
if len(segments) == 0 {
|
||||
return errors.New("could not copy object, list segments are empty")
|
||||
}
|
||||
nanoSeconds := time.Now().Nanosecond()
|
||||
prefixSegment := fmt.Sprintf("%v/%v/%s", nanoSeconds, src.size, strings.ReplaceAll(uuid.New().String(), "-", ""))
|
||||
copiedSegmentsLen := 10
|
||||
for _, value := range segments {
|
||||
if len(value) <= 0 {
|
||||
continue
|
||||
}
|
||||
fragment := value[0]
|
||||
if len(fragment) <= 0 {
|
||||
continue
|
||||
}
|
||||
copiedSegmentsLen = len(value)
|
||||
firstIndex := strings.Index(fragment, "/")
|
||||
if firstIndex < 0 {
|
||||
firstIndex = 0
|
||||
} else {
|
||||
firstIndex = firstIndex + 1
|
||||
}
|
||||
lastIndex := strings.LastIndex(fragment, "/")
|
||||
if lastIndex < 0 {
|
||||
lastIndex = len(fragment)
|
||||
} else {
|
||||
lastIndex = lastIndex - 1
|
||||
}
|
||||
prefixSegment = fragment[firstIndex:lastIndex]
|
||||
break
|
||||
}
|
||||
copiedSegments := make([]string, copiedSegmentsLen)
|
||||
defer handleCopyFail(ctx, f, segmentsContainer, copiedSegments, err)
|
||||
for c, ss := range segments {
|
||||
if len(ss) <= 0 {
|
||||
continue
|
||||
}
|
||||
for _, s := range ss {
|
||||
lastIndex := strings.LastIndex(s, "/")
|
||||
if lastIndex <= 0 {
|
||||
lastIndex = 0
|
||||
} else {
|
||||
lastIndex = lastIndex + 1
|
||||
}
|
||||
segmentName := dstPath + "/" + prefixSegment + "/" + s[lastIndex:]
|
||||
err = f.pacer.Call(func() (bool, error) {
|
||||
var rxHeaders swift.Headers
|
||||
rxHeaders, err = f.c.ObjectCopy(ctx, c, s, segmentsContainer, segmentName, nil)
|
||||
copiedSegments = append(copiedSegments, segmentName)
|
||||
return shouldRetryHeaders(ctx, rxHeaders, err)
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
m := swift.Metadata{}
|
||||
headers := m.ObjectHeaders()
|
||||
headers["X-Object-Manifest"] = urlEncode(fmt.Sprintf("%s/%s/%s", segmentsContainer, dstPath, prefixSegment))
|
||||
headers["Content-Length"] = "0"
|
||||
}
|
||||
|
||||
// upload the manifest when upload is done
|
||||
func (su *segmentedUpload) uploadManifest(ctx context.Context, contentType string, headers swift.Headers) (err error) {
|
||||
delete(headers, "Etag") // remove Etag if present as it is wrong for the manifest
|
||||
headers["X-Object-Manifest"] = urlEncode(su.fullPath())
|
||||
headers["Content-Length"] = "0" // set Content-Length as we know it
|
||||
emptyReader := bytes.NewReader(nil)
|
||||
err = f.pacer.Call(func() (bool, error) {
|
||||
fs.Debugf(su.f, "uploading manifest %q to %q", su.dstPath, su.dstContainer)
|
||||
err = su.f.pacer.Call(func() (bool, error) {
|
||||
var rxHeaders swift.Headers
|
||||
rxHeaders, err = f.c.ObjectPut(ctx, dstContainer, dstPath, emptyReader, true, "", src.contentType, headers)
|
||||
rxHeaders, err = su.f.c.ObjectPut(ctx, su.dstContainer, su.dstPath, emptyReader, true, "", contentType, headers)
|
||||
return shouldRetryHeaders(ctx, rxHeaders, err)
|
||||
})
|
||||
return err
|
||||
}
|
||||
|
||||
// remove copied segments when copy process failed
|
||||
func handleCopyFail(ctx context.Context, f *Fs, segmentsContainer string, segments []string, err error) {
|
||||
fs.Debugf(f, "handle copy segment fail")
|
||||
if err == nil {
|
||||
return
|
||||
// Copy a large object src into (dstContainer, dstPath)
|
||||
func (f *Fs) copyLargeObject(ctx context.Context, src *Object, dstContainer string, dstPath string) (err error) {
|
||||
su, err := f.newSegmentedUpload(ctx, dstContainer, dstPath)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if len(segmentsContainer) == 0 {
|
||||
fs.Debugf(f, "invalid segments container")
|
||||
return
|
||||
srcSegmentsContainer, srcSegments, err := src.getSegmentsLargeObject(ctx)
|
||||
if err != nil {
|
||||
return fmt.Errorf("copy large object: %w", err)
|
||||
}
|
||||
if len(segments) == 0 {
|
||||
fs.Debugf(f, "segments is empty")
|
||||
return
|
||||
if len(srcSegments) == 0 {
|
||||
return errors.New("could not copy object, list segments are empty")
|
||||
}
|
||||
fs.Debugf(f, "action delete segments what copied")
|
||||
for _, v := range segments {
|
||||
_ = f.c.ObjectDelete(ctx, segmentsContainer, v)
|
||||
defer atexit.OnError(&err, su.onFail)()
|
||||
for i, srcSegment := range srcSegments {
|
||||
dstSegment := su.segmentPath(i)
|
||||
err = f.pacer.Call(func() (bool, error) {
|
||||
var rxHeaders swift.Headers
|
||||
rxHeaders, err = f.c.ObjectCopy(ctx, srcSegmentsContainer, srcSegment, su.container, dstSegment, nil)
|
||||
return shouldRetryHeaders(ctx, rxHeaders, err)
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
su.uploaded(dstSegment)
|
||||
}
|
||||
return su.uploadManifest(ctx, src.contentType, src.headers)
|
||||
}
|
||||
|
||||
// Hashes returns the supported hash sets.
|
||||
|
@ -1308,35 +1384,30 @@ func min(x, y int64) int64 {
|
|||
return y
|
||||
}
|
||||
|
||||
func (o *Object) getSegmentsLargeObject(ctx context.Context) (map[string][]string, error) {
|
||||
// Get the segments for a large object
|
||||
//
|
||||
// It returns the names of the segments and the container that they live in
|
||||
func (o *Object) getSegmentsLargeObject(ctx context.Context) (container string, segments []string, err error) {
|
||||
container, objectName := o.split()
|
||||
segmentContainer, segmentObjects, err := o.fs.c.LargeObjectGetSegments(ctx, container, objectName)
|
||||
container, segmentObjects, err := o.fs.c.LargeObjectGetSegments(ctx, container, objectName)
|
||||
if err != nil {
|
||||
fs.Debugf(o, "Failed to get list segments of object: %v", err)
|
||||
return nil, err
|
||||
return container, segments, fmt.Errorf("failed to get list segments of object: %w", err)
|
||||
}
|
||||
var containerSegments = make(map[string][]string)
|
||||
for _, segment := range segmentObjects {
|
||||
if _, ok := containerSegments[segmentContainer]; !ok {
|
||||
containerSegments[segmentContainer] = make([]string, 0, len(segmentObjects))
|
||||
}
|
||||
segments := containerSegments[segmentContainer]
|
||||
segments = append(segments, segment.Name)
|
||||
containerSegments[segmentContainer] = segments
|
||||
segments = make([]string, len(segmentObjects))
|
||||
for i := range segmentObjects {
|
||||
segments[i] = segmentObjects[i].Name
|
||||
}
|
||||
return containerSegments, nil
|
||||
return container, segments, nil
|
||||
}
|
||||
|
||||
func (o *Object) removeSegmentsLargeObject(ctx context.Context, containerSegments map[string][]string) error {
|
||||
if containerSegments == nil || len(containerSegments) <= 0 {
|
||||
// Remove the segments for a large object
|
||||
func (o *Object) removeSegmentsLargeObject(ctx context.Context, container string, segments []string) error {
|
||||
if len(segments) == 0 {
|
||||
return nil
|
||||
}
|
||||
for container, segments := range containerSegments {
|
||||
_, err := o.fs.c.BulkDelete(ctx, container, segments)
|
||||
if err != nil {
|
||||
fs.Debugf(o, "Failed to delete bulk segments %v", err)
|
||||
return err
|
||||
}
|
||||
_, err := o.fs.c.BulkDelete(ctx, container, segments)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to delete bulk segments: %w", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
@ -1359,55 +1430,25 @@ func urlEncode(str string) string {
|
|||
}
|
||||
|
||||
// updateChunks updates the existing object using chunks to a separate
|
||||
// container. It returns a string which prefixes current segments.
|
||||
func (o *Object) updateChunks(ctx context.Context, in0 io.Reader, headers swift.Headers, size int64, contentType string) (string, error) {
|
||||
// container.
|
||||
func (o *Object) updateChunks(ctx context.Context, in0 io.Reader, headers swift.Headers, size int64, contentType string) (err error) {
|
||||
container, containerPath := o.split()
|
||||
segmentsContainer := container + "_segments"
|
||||
// Create the segmentsContainer if it doesn't exist
|
||||
var err error
|
||||
err = o.fs.pacer.Call(func() (bool, error) {
|
||||
var rxHeaders swift.Headers
|
||||
_, rxHeaders, err = o.fs.c.Container(ctx, segmentsContainer)
|
||||
return shouldRetryHeaders(ctx, rxHeaders, err)
|
||||
})
|
||||
if err == swift.ContainerNotFound {
|
||||
headers := swift.Headers{}
|
||||
if o.fs.opt.StoragePolicy != "" {
|
||||
headers["X-Storage-Policy"] = o.fs.opt.StoragePolicy
|
||||
}
|
||||
err = o.fs.pacer.Call(func() (bool, error) {
|
||||
err = o.fs.c.ContainerCreate(ctx, segmentsContainer, headers)
|
||||
return shouldRetry(ctx, err)
|
||||
})
|
||||
}
|
||||
su, err := o.fs.newSegmentedUpload(ctx, container, containerPath)
|
||||
if err != nil {
|
||||
return "", err
|
||||
return err
|
||||
}
|
||||
// Upload the chunks
|
||||
left := size
|
||||
i := 0
|
||||
uniquePrefix := fmt.Sprintf("%s/%d", swift.TimeToFloatString(time.Now()), size)
|
||||
segmentsPath := path.Join(containerPath, uniquePrefix)
|
||||
in := bufio.NewReader(in0)
|
||||
segmentInfos := make([]string, 0, (size/int64(o.fs.opt.ChunkSize))+1)
|
||||
defer atexit.OnError(&err, func() {
|
||||
if o.fs.opt.LeavePartsOnError {
|
||||
return
|
||||
}
|
||||
fs.Debugf(o, "Delete segments when err raise %v", err)
|
||||
if len(segmentInfos) == 0 {
|
||||
return
|
||||
}
|
||||
_ctx := context.Background()
|
||||
deleteChunks(_ctx, o, segmentsContainer, segmentInfos)
|
||||
})()
|
||||
defer atexit.OnError(&err, su.onFail)()
|
||||
for {
|
||||
// can we read at least one byte?
|
||||
if _, err = in.Peek(1); err != nil {
|
||||
if left > 0 {
|
||||
return "", err // read less than expected
|
||||
return err // read less than expected
|
||||
}
|
||||
fs.Debugf(o, "Uploading segments into %q seems done (%v)", segmentsContainer, err)
|
||||
fs.Debugf(o, "Uploading segments into %q seems done (%v)", su.container, err)
|
||||
break
|
||||
}
|
||||
n := int64(o.fs.opt.ChunkSize)
|
||||
|
@ -1417,49 +1458,20 @@ func (o *Object) updateChunks(ctx context.Context, in0 io.Reader, headers swift.
|
|||
left -= n
|
||||
}
|
||||
segmentReader := io.LimitReader(in, n)
|
||||
segmentPath := fmt.Sprintf("%s/%08d", segmentsPath, i)
|
||||
fs.Debugf(o, "Uploading segment file %q into %q", segmentPath, segmentsContainer)
|
||||
segmentPath := su.segmentPath(i)
|
||||
fs.Debugf(o, "Uploading segment file %q into %q", segmentPath, su.container)
|
||||
err = o.fs.pacer.CallNoRetry(func() (bool, error) {
|
||||
var rxHeaders swift.Headers
|
||||
rxHeaders, err = o.fs.c.ObjectPut(ctx, segmentsContainer, segmentPath, segmentReader, true, "", "", headers)
|
||||
if err == nil {
|
||||
segmentInfos = append(segmentInfos, segmentPath)
|
||||
}
|
||||
rxHeaders, err = o.fs.c.ObjectPut(ctx, su.container, segmentPath, segmentReader, true, "", "", headers)
|
||||
return shouldRetryHeaders(ctx, rxHeaders, err)
|
||||
})
|
||||
if err != nil {
|
||||
return "", err
|
||||
return err
|
||||
}
|
||||
su.uploaded(segmentPath)
|
||||
i++
|
||||
}
|
||||
// Upload the manifest
|
||||
headers["X-Object-Manifest"] = urlEncode(fmt.Sprintf("%s/%s", segmentsContainer, segmentsPath))
|
||||
headers["Content-Length"] = "0" // set Content-Length as we know it
|
||||
emptyReader := bytes.NewReader(nil)
|
||||
err = o.fs.pacer.Call(func() (bool, error) {
|
||||
var rxHeaders swift.Headers
|
||||
rxHeaders, err = o.fs.c.ObjectPut(ctx, container, containerPath, emptyReader, true, "", contentType, headers)
|
||||
return shouldRetryHeaders(ctx, rxHeaders, err)
|
||||
})
|
||||
|
||||
if err == nil {
|
||||
//reset data
|
||||
segmentInfos = nil
|
||||
}
|
||||
return uniquePrefix + "/", err
|
||||
}
|
||||
|
||||
func deleteChunks(ctx context.Context, o *Object, segmentsContainer string, segmentInfos []string) {
|
||||
if len(segmentInfos) == 0 {
|
||||
return
|
||||
}
|
||||
for _, v := range segmentInfos {
|
||||
fs.Debugf(o, "Delete segment file %q on %q", v, segmentsContainer)
|
||||
e := o.fs.c.ObjectDelete(ctx, segmentsContainer, v)
|
||||
if e != nil {
|
||||
fs.Errorf(o, "Error occurred in delete segment file %q on %q, error: %q", v, segmentsContainer, e)
|
||||
}
|
||||
}
|
||||
return su.uploadManifest(ctx, contentType, headers)
|
||||
}
|
||||
|
||||
// Update the object with the contents of the io.Reader, modTime and size
|
||||
|
@ -1483,10 +1495,11 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op
|
|||
return err
|
||||
}
|
||||
|
||||
//capture segments before upload
|
||||
var segmentsContainer map[string][]string
|
||||
// Capture segments before upload
|
||||
var segmentsContainer string
|
||||
var segments []string
|
||||
if isLargeObject {
|
||||
segmentsContainer, _ = o.getSegmentsLargeObject(ctx)
|
||||
segmentsContainer, segments, _ = o.getSegmentsLargeObject(ctx)
|
||||
}
|
||||
|
||||
// Set the mtime
|
||||
|
@ -1497,7 +1510,7 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op
|
|||
fs.OpenOptionAddHeaders(options, headers)
|
||||
|
||||
if (size > int64(o.fs.opt.ChunkSize) || (size == -1 && !o.fs.opt.NoChunk)) && !o.fs.opt.NoLargeObjects {
|
||||
_, err = o.updateChunks(ctx, in, headers, size, contentType)
|
||||
err = o.updateChunks(ctx, in, headers, size, contentType)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -1535,7 +1548,7 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op
|
|||
if isLargeObject && len(segmentsContainer) > 0 {
|
||||
isInContainerVersioning, _ := o.isInContainerVersioning(ctx, container)
|
||||
if !isInContainerVersioning {
|
||||
err := o.removeSegmentsLargeObject(ctx, segmentsContainer)
|
||||
err := o.removeSegmentsLargeObject(ctx, segmentsContainer, segments)
|
||||
if err != nil {
|
||||
fs.Logf(o, "Failed to remove old segments - carrying on with upload: %v", err)
|
||||
}
|
||||
|
@ -1563,10 +1576,11 @@ func (o *Object) Remove(ctx context.Context) (err error) {
|
|||
return err
|
||||
}
|
||||
}
|
||||
//capture segments object if this object is large object
|
||||
var containerSegments map[string][]string
|
||||
// Capture segments object if this object is large object
|
||||
var segmentsContainer string
|
||||
var segments []string
|
||||
if isLargeObject {
|
||||
containerSegments, err = o.getSegmentsLargeObject(ctx)
|
||||
segmentsContainer, segments, err = o.getSegmentsLargeObject(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -1589,7 +1603,7 @@ func (o *Object) Remove(ctx context.Context) (err error) {
|
|||
}
|
||||
|
||||
if isLargeObject {
|
||||
return o.removeSegmentsLargeObject(ctx, containerSegments)
|
||||
return o.removeSegmentsLargeObject(ctx, segmentsContainer, segments)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -118,6 +118,9 @@ func (f *Fs) testWithChunkFail(t *testing.T) {
|
|||
f.opt.NoChunk = false
|
||||
f.opt.ChunkSize = 1024 * fs.SizeSuffixBase
|
||||
segmentContainer := f.root + "_segments"
|
||||
if !f.opt.UseSegmentsContainer.Value {
|
||||
segmentContainer = f.root
|
||||
}
|
||||
defer func() {
|
||||
//restore config
|
||||
f.opt.ChunkSize = preConfChunkSize
|
||||
|
@ -147,6 +150,9 @@ func (f *Fs) testWithChunkFail(t *testing.T) {
|
|||
_, _, err = f.c.Object(ctx, f.rootContainer, path)
|
||||
assert.Equal(t, swift.ObjectNotFound, err)
|
||||
prefix := path
|
||||
if !f.opt.UseSegmentsContainer.Value {
|
||||
prefix = segmentsDirectory + "/" + prefix
|
||||
}
|
||||
objs, err := f.c.Objects(ctx, segmentContainer, &swift.ObjectsOpts{
|
||||
Prefix: prefix,
|
||||
})
|
||||
|
|
|
@ -278,6 +278,9 @@ backends:
|
|||
- backend: "swift"
|
||||
remote: "TestSwiftAIO:"
|
||||
fastlist: true
|
||||
- backend: "swift"
|
||||
remote: "TestSwiftAIO,use_segments_container=false:"
|
||||
fastlist: true
|
||||
- backend: "swift"
|
||||
remote: "TestSwift:"
|
||||
fastlist: true
|
||||
|
|
Loading…
Reference in a new issue