diff --git a/backend/swift/swift.go b/backend/swift/swift.go index 05bf92773..b8ce57dce 100644 --- a/backend/swift/swift.go +++ b/backend/swift/swift.go @@ -24,6 +24,7 @@ import ( "github.com/rclone/rclone/fs/hash" "github.com/rclone/rclone/fs/operations" "github.com/rclone/rclone/fs/walk" + "github.com/rclone/rclone/lib/atexit" "github.com/rclone/rclone/lib/bucket" "github.com/rclone/rclone/lib/encoder" "github.com/rclone/rclone/lib/pacer" @@ -167,6 +168,11 @@ func init() { Help: "Admin", Value: "admin", }}, + }, { + Name: "leave_parts_on_error", + Help: `If true avoid calling abort upload on a failure. It should be set to true for resuming uploads across different sessions.`, + Default: false, + Advanced: true, }, { Name: "storage_policy", Help: `The storage policy to use when creating a new container @@ -208,6 +214,7 @@ type Options struct { ApplicationCredentialID string `config:"application_credential_id"` ApplicationCredentialName string `config:"application_credential_name"` ApplicationCredentialSecret string `config:"application_credential_secret"` + LeavePartsOnError bool `config:"leave_parts_on_error"` StoragePolicy string `config:"storage_policy"` EndpointType string `config:"endpoint_type"` ChunkSize fs.SizeSuffix `config:"chunk_size"` @@ -1127,44 +1134,35 @@ func min(x, y int64) int64 { return y } -// removeSegments removes any old segments from o -// -// if except is passed in then segments with that prefix won't be deleted -func (o *Object) removeSegments(except string) error { - segmentsContainer, _, err := o.getSegmentsDlo() +func (o *Object) getSegmentsLargeObject() (map[string][]string, error) { + container, objectName := o.split() + segmentContainer, segmentObjects, err := o.fs.c.LargeObjectGetSegments(container, objectName) if err != nil { - return err + fs.Debugf(o, "Failed to get list segments of object: %v", err) + return nil, err } - except = path.Join(o.remote, except) - // fs.Debugf(o, "segmentsContainer %q prefix %q", segmentsContainer, prefix) - err = o.fs.listContainerRoot(segmentsContainer, o.remote, "", false, true, true, func(remote string, object *swift.Object, isDirectory bool) error { - if isDirectory { - return nil + var containerSegments = make(map[string][]string) + for _, segment := range segmentObjects { + if _, ok := containerSegments[segmentContainer]; !ok { + containerSegments[segmentContainer] = make([]string, 0, len(segmentObjects)) } - if except != "" && strings.HasPrefix(remote, except) { - // fs.Debugf(o, "Ignoring current segment file %q in container %q", remote, segmentsContainer) - return nil - } - fs.Debugf(o, "Removing segment file %q in container %q", remote, segmentsContainer) - var err error - return o.fs.pacer.Call(func() (bool, error) { - err = o.fs.c.ObjectDelete(segmentsContainer, remote) - return shouldRetry(err) - }) - }) - if err != nil { - return err + segments, _ := containerSegments[segmentContainer] + segments = append(segments, segment.Name) + containerSegments[segmentContainer] = segments } - // remove the segments container if empty, ignore errors - err = o.fs.pacer.Call(func() (bool, error) { - err = o.fs.c.ContainerDelete(segmentsContainer) - if err == swift.ContainerNotFound || err == swift.ContainerNotEmpty { - return false, err + return containerSegments, nil +} + +func (o *Object) removeSegmentsLargeObject(containerSegments map[string][]string) error { + if containerSegments == nil || len(containerSegments) <= 0 { + return nil + } + for container, segments := range containerSegments { + _, err := o.fs.c.BulkDelete(container, segments) + if err != nil { + fs.Debugf(o, "Failed to delete bulk segments %v", err) + return err } - return shouldRetry(err) - }) - if err == nil { - fs.Debugf(o, "Removed empty container %q", segmentsContainer) } return nil } @@ -1194,7 +1192,7 @@ func urlEncode(str string) string { var buf bytes.Buffer for i := 0; i < len(str); i++ { c := str[i] - if (c >= '0' && c <= '9') || (c >= 'a' && c <= 'z') || (c >= 'A' && c <= 'Z') || c == '/' || c == '.' { + if (c >= '0' && c <= '9') || (c >= 'a' && c <= 'z') || (c >= 'A' && c <= 'Z') || c == '/' || c == '.' || c == '_' || c == '-' { _ = buf.WriteByte(c) } else { _, _ = buf.WriteString(fmt.Sprintf("%%%02X", c)) @@ -1234,10 +1232,20 @@ func (o *Object) updateChunks(in0 io.Reader, headers swift.Headers, size int64, uniquePrefix := fmt.Sprintf("%s/%d", swift.TimeToFloatString(time.Now()), size) segmentsPath := path.Join(containerPath, uniquePrefix) in := bufio.NewReader(in0) - segmentInfos := make([]string, 0, ((size / int64(o.fs.opt.ChunkSize)) + 1)) + segmentInfos := make([]string, 0, (size/int64(o.fs.opt.ChunkSize))+1) + defer atexit.OnError(&err, func() { + if o.fs.opt.LeavePartsOnError { + return + } + fs.Debugf(o, "Delete segments when err raise %v", err) + if segmentInfos == nil || len(segmentInfos) == 0 { + return + } + deleteChunks(o, segmentsContainer, segmentInfos) + })() for { // can we read at least one byte? - if _, err := in.Peek(1); err != nil { + if _, err = in.Peek(1); err != nil { if left > 0 { return "", err // read less than expected } @@ -1262,8 +1270,6 @@ func (o *Object) updateChunks(in0 io.Reader, headers swift.Headers, size int64, return shouldRetryHeaders(rxHeaders, err) }) if err != nil { - deleteChunks(o, segmentsContainer, segmentInfos) - segmentInfos = nil return "", err } i++ @@ -1277,21 +1283,23 @@ func (o *Object) updateChunks(in0 io.Reader, headers swift.Headers, size int64, rxHeaders, err = o.fs.c.ObjectPut(container, containerPath, emptyReader, true, "", contentType, headers) return shouldRetryHeaders(rxHeaders, err) }) - if err != nil { - deleteChunks(o, segmentsContainer, segmentInfos) + + if err == nil { + //reset data segmentInfos = nil } return uniquePrefix + "/", err } func deleteChunks(o *Object, segmentsContainer string, segmentInfos []string) { - if segmentInfos != nil && len(segmentInfos) > 0 { - for _, v := range segmentInfos { - fs.Debugf(o, "Delete segment file %q on %q", v, segmentsContainer) - e := o.fs.c.ObjectDelete(segmentsContainer, v) - if e != nil { - fs.Errorf(o, "Error occurred in delete segment file %q on %q, error: %q", v, segmentsContainer, e) - } + if segmentInfos == nil || len(segmentInfos) == 0 { + return + } + for _, v := range segmentInfos { + fs.Debugf(o, "Delete segment file %q on %q", v, segmentsContainer) + e := o.fs.c.ObjectDelete(segmentsContainer, v) + if e != nil { + fs.Errorf(o, "Error occurred in delete segment file %q on %q, error: %q", v, segmentsContainer, e) } } } @@ -1312,20 +1320,26 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op modTime := src.ModTime(ctx) // Note whether this is a dynamic large object before starting - isDynamicLargeObject, err := o.isDynamicLargeObject() + isLargeObject, err := o.isLargeObject() if err != nil { return err } + //capture segments before upload + var segmentsContainer map[string][]string + if isLargeObject { + segmentsContainer, _ = o.getSegmentsLargeObject() + } + // Set the mtime m := swift.Metadata{} m.SetModTime(modTime) contentType := fs.MimeType(ctx, src) headers := m.ObjectHeaders() fs.OpenOptionAddHeaders(options, headers) - uniquePrefix := "" + if size > int64(o.fs.opt.ChunkSize) || (size == -1 && !o.fs.opt.NoChunk) { - uniquePrefix, err = o.updateChunks(in, headers, size, contentType) + _, err = o.updateChunks(in, headers, size, contentType) if err != nil { return err } @@ -1359,10 +1373,10 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op o.size = int64(inCount.BytesRead()) } } - - // If file was a dynamic large object then remove old/all segments - if isDynamicLargeObject { - err = o.removeSegments(uniquePrefix) + isInContainerVersioning, _ := o.isInContainerVersioning(container) + // If file was a large object and the container is not enable versioning then remove old/all segments + if isLargeObject && len(segmentsContainer) > 0 && !isInContainerVersioning { + err := o.removeSegmentsLargeObject(segmentsContainer) if err != nil { fs.Logf(o, "Failed to remove old segments - carrying on with upload: %v", err) } @@ -1389,15 +1403,10 @@ func (o *Object) Remove(ctx context.Context) (err error) { return err } } - isStaticLargeObject, err := o.isStaticLargeObject() - if err != nil { - return err - } - var segmentContainer string - var segmentObjects []swift.Object - - if isStaticLargeObject { - segmentContainer, segmentObjects, err = o.fs.c.LargeObjectGetSegments(container, containerPath) + //capture segments object if this object is large object + var containerSegments map[string][]string + if isLargeObject { + containerSegments, err = o.getSegmentsLargeObject() if err != nil { return err } @@ -1415,31 +1424,9 @@ func (o *Object) Remove(ctx context.Context) (err error) { return nil } - isDynamicLargeObject, err := o.isDynamicLargeObject() - if err != nil { - return err + if isLargeObject { + return o.removeSegmentsLargeObject(containerSegments) } - // ...then segments if required - //delete segment for dynamic large object - if isDynamicLargeObject { - return o.removeSegments("") - } - - //delete segment for static large object - if isStaticLargeObject && len(segmentContainer) > 0 && segmentObjects != nil && len(segmentObjects) > 0 { - var segmentNames []string - for _, segmentObject := range segmentObjects { - if len(segmentObject.Name) == 0 { - continue - } - segmentNames = append(segmentNames, segmentObject.Name) - } - _, err := o.fs.c.BulkDelete(segmentContainer, segmentNames) - if err != nil { - return err - } - } - return nil } diff --git a/backend/swift/swift_test.go b/backend/swift/swift_test.go index 3e6b196b8..29a8a344c 100644 --- a/backend/swift/swift_test.go +++ b/backend/swift/swift_test.go @@ -4,15 +4,19 @@ package swift import ( "bytes" "context" + "errors" "io" + "io/ioutil" "testing" + "github.com/ncw/swift" "github.com/rclone/rclone/fs" "github.com/rclone/rclone/fs/hash" "github.com/rclone/rclone/fs/object" "github.com/rclone/rclone/fstest" "github.com/rclone/rclone/fstest/fstests" "github.com/rclone/rclone/lib/random" + "github.com/rclone/rclone/lib/readers" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -74,6 +78,80 @@ func (f *Fs) testNoChunk(t *testing.T) { // Additional tests that aren't in the framework func (f *Fs) InternalTest(t *testing.T) { t.Run("NoChunk", f.testNoChunk) + t.Run("WithChunk", f.testWithChunk) + t.Run("WithChunkFail", f.testWithChunkFail) +} + +func (f *Fs) testWithChunk(t *testing.T) { + preConfChunkSize := f.opt.ChunkSize + preConfChunk := f.opt.NoChunk + f.opt.NoChunk = false + f.opt.ChunkSize = 1024 * fs.Byte + defer func() { + //restore old config after test + f.opt.ChunkSize = preConfChunkSize + f.opt.NoChunk = preConfChunk + }() + + file := fstest.Item{ + ModTime: fstest.Time("2020-12-31T04:05:06.499999999Z"), + Path: "piped data chunk.txt", + Size: -1, // use unknown size during upload + } + const contentSize = 2048 + contents := random.String(contentSize) + buf := bytes.NewBufferString(contents) + uploadHash := hash.NewMultiHasher() + in := io.TeeReader(buf, uploadHash) + + file.Size = -1 + obji := object.NewStaticObjectInfo(file.Path, file.ModTime, file.Size, true, nil, nil) + ctx := context.TODO() + obj, err := f.Features().PutStream(ctx, in, obji) + require.NoError(t, err) + require.NotEmpty(t, obj) +} + +func (f *Fs) testWithChunkFail(t *testing.T) { + preConfChunkSize := f.opt.ChunkSize + preConfChunk := f.opt.NoChunk + f.opt.NoChunk = false + f.opt.ChunkSize = 1024 * fs.Byte + segmentContainer := f.root + "_segments" + defer func() { + //restore config + f.opt.ChunkSize = preConfChunkSize + f.opt.NoChunk = preConfChunk + }() + path := "piped data chunk with error.txt" + file := fstest.Item{ + ModTime: fstest.Time("2021-01-04T03:46:00.499999999Z"), + Path: path, + Size: -1, // use unknown size during upload + } + const contentSize = 4096 + const errPosition = 3072 + contents := random.String(contentSize) + buf := bytes.NewBufferString(contents[:errPosition]) + errMessage := "potato" + er := &readers.ErrorReader{Err: errors.New(errMessage)} + in := ioutil.NopCloser(io.MultiReader(buf, er)) + + file.Size = contentSize + obji := object.NewStaticObjectInfo(file.Path, file.ModTime, file.Size, true, nil, nil) + ctx := context.TODO() + _, err := f.Features().PutStream(ctx, in, obji) + // error is potato + require.NotNil(t, err) + require.Equal(t, errMessage, err.Error()) + _, _, err = f.c.Object(f.rootContainer, path) + assert.Equal(t, swift.ObjectNotFound, err) + prefix := path + objs, err := f.c.Objects(segmentContainer, &swift.ObjectsOpts{ + Prefix: prefix, + }) + require.NoError(t, err) + require.Empty(t, objs) } var _ fstests.InternalTester = (*Fs)(nil)