From d6870473a15cc62d7530527f0199ebd1d4688884 Mon Sep 17 00:00:00 2001 From: nguyenhuuluan434 Date: Tue, 2 Mar 2021 15:58:14 +0700 Subject: [PATCH] swift: implement copying large objects --- backend/swift/swift.go | 123 ++++++++++++++++++++++++++++++++++-- backend/swift/swift_test.go | 36 +++++++++++ go.mod | 2 +- 3 files changed, 153 insertions(+), 8 deletions(-) diff --git a/backend/swift/swift.go b/backend/swift/swift.go index 1f1032915..203deae35 100644 --- a/backend/swift/swift.go +++ b/backend/swift/swift.go @@ -13,6 +13,7 @@ import ( "strings" "time" + "github.com/google/uuid" "github.com/ncw/swift/v2" "github.com/pkg/errors" "github.com/rclone/rclone/fs" @@ -905,18 +906,125 @@ func (f *Fs) Copy(ctx context.Context, src fs.Object, remote string) (fs.Object, fs.Debugf(src, "Can't copy - not same remote type") return nil, fs.ErrorCantCopy } - srcContainer, srcPath := srcObj.split() - err = f.pacer.Call(func() (bool, error) { - var rxHeaders swift.Headers - rxHeaders, err = f.c.ObjectCopy(ctx, srcContainer, srcPath, dstContainer, dstPath, nil) - return shouldRetryHeaders(ctx, rxHeaders, err) - }) + isLargeObject, err := srcObj.isLargeObject(ctx) + if err != nil { + return nil, err + } + if isLargeObject { + /*handle large object*/ + err = copyLargeObject(ctx, f, srcObj, dstContainer, dstPath) + } else { + srcContainer, srcPath := srcObj.split() + err = f.pacer.Call(func() (bool, error) { + var rxHeaders swift.Headers + rxHeaders, err = f.c.ObjectCopy(ctx, srcContainer, srcPath, dstContainer, dstPath, nil) + return shouldRetryHeaders(ctx, rxHeaders, err) + }) + } if err != nil { return nil, err } return f.NewObject(ctx, remote) } +func copyLargeObject(ctx context.Context, f *Fs, src *Object, dstContainer string, dstPath string) error { + segmentsContainer := dstContainer + "_segments" + err := f.makeContainer(ctx, segmentsContainer) + if err != nil { + return err + } + segments, err := src.getSegmentsLargeObject(ctx) + if err != nil { + return err + } + if len(segments) == 0 { + return errors.New("could not copy object, list segments are empty") + } + nanoSeconds := time.Now().Nanosecond() + prefixSegment := fmt.Sprintf("%v/%v/%s", nanoSeconds, src.size, strings.ReplaceAll(uuid.New().String(), "-", "")) + copiedSegmentsLen := 10 + for _, value := range segments { + if len(value) <= 0 { + continue + } + fragment := value[0] + if len(fragment) <= 0 { + continue + } + copiedSegmentsLen = len(value) + firstIndex := strings.Index(fragment, "/") + if firstIndex < 0 { + firstIndex = 0 + } else { + firstIndex = firstIndex + 1 + } + lastIndex := strings.LastIndex(fragment, "/") + if lastIndex < 0 { + lastIndex = len(fragment) + } else { + lastIndex = lastIndex - 1 + } + prefixSegment = fragment[firstIndex:lastIndex] + break + } + copiedSegments := make([]string, copiedSegmentsLen) + defer handleCopyFail(ctx, f, segmentsContainer, copiedSegments, err) + for c, ss := range segments { + if len(ss) <= 0 { + continue + } + for _, s := range ss { + lastIndex := strings.LastIndex(s, "/") + if lastIndex <= 0 { + lastIndex = 0 + } else { + lastIndex = lastIndex + 1 + } + segmentName := dstPath + "/" + prefixSegment + "/" + s[lastIndex:] + err = f.pacer.Call(func() (bool, error) { + var rxHeaders swift.Headers + rxHeaders, err = f.c.ObjectCopy(ctx, c, s, segmentsContainer, segmentName, nil) + copiedSegments = append(copiedSegments, segmentName) + return shouldRetryHeaders(ctx, rxHeaders, err) + }) + if err != nil { + return err + } + } + } + m := swift.Metadata{} + headers := m.ObjectHeaders() + headers["X-Object-Manifest"] = urlEncode(fmt.Sprintf("%s/%s/%s", segmentsContainer, dstPath, prefixSegment)) + headers["Content-Length"] = "0" + emptyReader := bytes.NewReader(nil) + err = f.pacer.Call(func() (bool, error) { + var rxHeaders swift.Headers + rxHeaders, err = f.c.ObjectPut(ctx, dstContainer, dstPath, emptyReader, true, "", src.contentType, headers) + return shouldRetryHeaders(ctx, rxHeaders, err) + }) + return err +} + +//remove copied segments when copy process failed +func handleCopyFail(ctx context.Context, f *Fs, segmentsContainer string, segments []string, err error) { + fs.Debugf(f, "handle copy segment fail") + if err == nil { + return + } + if len(segmentsContainer) == 0 { + fs.Debugf(f, "invalid segments container") + return + } + if len(segments) == 0 { + fs.Debugf(f, "segments is empty") + return + } + fs.Debugf(f, "action delete segments what copied") + for _, v := range segments { + _ = f.c.ObjectDelete(ctx, segmentsContainer, v) + } +} + // Hashes returns the supported hash sets. func (f *Fs) Hashes() hash.Set { return hash.Set(hash.MD5) @@ -1244,7 +1352,8 @@ func (o *Object) updateChunks(ctx context.Context, in0 io.Reader, headers swift. if segmentInfos == nil || len(segmentInfos) == 0 { return } - deleteChunks(ctx, o, segmentsContainer, segmentInfos) + _ctx := context.Background() + deleteChunks(_ctx, o, segmentsContainer, segmentInfos) })() for { // can we read at least one byte? diff --git a/backend/swift/swift_test.go b/backend/swift/swift_test.go index 96ab0c1c6..cd3632517 100644 --- a/backend/swift/swift_test.go +++ b/backend/swift/swift_test.go @@ -80,6 +80,7 @@ func (f *Fs) InternalTest(t *testing.T) { t.Run("NoChunk", f.testNoChunk) t.Run("WithChunk", f.testWithChunk) t.Run("WithChunkFail", f.testWithChunkFail) + t.Run("CopyLargeObject", f.testCopyLargeObject) } func (f *Fs) testWithChunk(t *testing.T) { @@ -154,4 +155,39 @@ func (f *Fs) testWithChunkFail(t *testing.T) { require.Empty(t, objs) } +func (f *Fs) testCopyLargeObject(t *testing.T) { + preConfChunkSize := f.opt.ChunkSize + preConfChunk := f.opt.NoChunk + f.opt.NoChunk = false + f.opt.ChunkSize = 1024 * fs.Byte + defer func() { + //restore old config after test + f.opt.ChunkSize = preConfChunkSize + f.opt.NoChunk = preConfChunk + }() + + file := fstest.Item{ + ModTime: fstest.Time("2020-12-31T04:05:06.499999999Z"), + Path: "large.txt", + Size: -1, // use unknown size during upload + } + const contentSize = 2048 + contents := random.String(contentSize) + buf := bytes.NewBufferString(contents) + uploadHash := hash.NewMultiHasher() + in := io.TeeReader(buf, uploadHash) + + file.Size = -1 + obji := object.NewStaticObjectInfo(file.Path, file.ModTime, file.Size, true, nil, nil) + ctx := context.TODO() + obj, err := f.Features().PutStream(ctx, in, obji) + require.NoError(t, err) + require.NotEmpty(t, obj) + remoteTarget := "large.txt (copy)" + objTarget, err := f.Features().Copy(ctx, obj, remoteTarget) + require.NoError(t, err) + require.NotEmpty(t, objTarget) + require.Equal(t, obj.Size(), objTarget.Size()) +} + var _ fstests.InternalTester = (*Fs)(nil) diff --git a/go.mod b/go.mod index 65e44fc1f..818593849 100644 --- a/go.mod +++ b/go.mod @@ -27,7 +27,7 @@ require ( github.com/gabriel-vasile/mimetype v1.1.2 github.com/gogo/protobuf v1.3.2 // indirect github.com/google/go-querystring v1.0.0 // indirect - github.com/google/uuid v1.2.0 // indirect + github.com/google/uuid v1.2.0 github.com/hanwen/go-fuse/v2 v2.0.3 github.com/iguanesolutions/go-systemd/v5 v5.0.0 github.com/jcmturner/gokrb5/v8 v8.4.2