From 0d0bcdac311bb0b849aa2d68f5d0267ba11fbbb9 Mon Sep 17 00:00:00 2001 From: Nick Craig-Wood Date: Sat, 19 Aug 2023 17:30:55 +0100 Subject: [PATCH] fs: add context.Ctx to ChunkWriter methods WriteChunk in particular needs a different context from that which OpenChunkWriter was used with so add it to all the methods. --- backend/s3/s3.go | 12 ++++++------ fs/features.go | 6 +++--- fs/operations/multithread.go | 16 +++++++--------- fstest/fstests/fstests.go | 8 ++++---- 4 files changed, 20 insertions(+), 22 deletions(-) diff --git a/backend/s3/s3.go b/backend/s3/s3.go index 351285d31..c5722b85e 100644 --- a/backend/s3/s3.go +++ b/backend/s3/s3.go @@ -5426,7 +5426,7 @@ func (w *s3ChunkWriter) addMd5(md5binary *[]byte, chunkNumber int64) { } // WriteChunk will write chunk number with reader bytes, where chunk number >= 0 -func (w *s3ChunkWriter) WriteChunk(chunkNumber int, reader io.ReadSeeker) (int64, error) { +func (w *s3ChunkWriter) WriteChunk(ctx context.Context, chunkNumber int, reader io.ReadSeeker) (int64, error) { if chunkNumber < 0 { err := fmt.Errorf("invalid chunk number provided: %v", chunkNumber) return -1, err @@ -5488,7 +5488,7 @@ func (w *s3ChunkWriter) WriteChunk(chunkNumber int, reader io.ReadSeeker) (int64 } // Abort the multpart upload -func (w *s3ChunkWriter) Abort() error { +func (w *s3ChunkWriter) Abort(ctx context.Context) error { err := w.f.pacer.Call(func() (bool, error) { _, err := w.f.c.AbortMultipartUploadWithContext(context.Background(), &s3.AbortMultipartUploadInput{ Bucket: w.bucket, @@ -5506,7 +5506,7 @@ func (w *s3ChunkWriter) Abort() error { } // Close and finalise the multipart upload -func (w *s3ChunkWriter) Close() (err error) { +func (w *s3ChunkWriter) Close(ctx context.Context) (err error) { // sort the completed parts by part number sort.Slice(w.completedParts, func(i, j int) bool { return *w.completedParts[i].PartNumber < *w.completedParts[j].PartNumber @@ -5561,7 +5561,7 @@ func (o *Object) uploadMultipart(ctx context.Context, src fs.ObjectInfo, in io.R return } fs.Debugf(o, "Cancelling multipart upload") - errCancel := chunkWriter.Abort() + errCancel := chunkWriter.Abort(ctx) if errCancel != nil { fs.Debugf(o, "Failed to cancel multipart upload: %v", errCancel) } @@ -5611,7 +5611,7 @@ func (o *Object) uploadMultipart(ctx context.Context, src fs.ObjectInfo, in io.R off += int64(n) g.Go(func() (err error) { defer free() - _, err = chunkWriter.WriteChunk(int(partNum), bytes.NewReader(buf)) + _, err = chunkWriter.WriteChunk(gCtx, int(partNum), bytes.NewReader(buf)) return err }) } @@ -5620,7 +5620,7 @@ func (o *Object) uploadMultipart(ctx context.Context, src fs.ObjectInfo, in io.R return wantETag, gotETag, nil, err } - err = chunkWriter.Close() + err = chunkWriter.Close(ctx) if err != nil { return wantETag, gotETag, nil, fmt.Errorf("multipart upload failed to finalise: %w", err) } diff --git a/fs/features.go b/fs/features.go index 019dcfcdc..560b6edd9 100644 --- a/fs/features.go +++ b/fs/features.go @@ -652,13 +652,13 @@ type OpenChunkWriterFn func(ctx context.Context, remote string, src ObjectInfo, type ChunkWriter interface { // WriteChunk will write chunk number with reader bytes, where chunk number >= 0 - WriteChunk(chunkNumber int, reader io.ReadSeeker) (bytesWritten int64, err error) + WriteChunk(ctx context.Context, chunkNumber int, reader io.ReadSeeker) (bytesWritten int64, err error) // Close complete chunked writer - Close() error + Close(ctx context.Context) error // Abort chunk write - Abort() error + Abort(ctx context.Context) error } // UserInfoer is an optional interface for Fs diff --git a/fs/operations/multithread.go b/fs/operations/multithread.go index a4a357e13..6882f23c4 100644 --- a/fs/operations/multithread.go +++ b/fs/operations/multithread.go @@ -86,7 +86,7 @@ func (mc *multiThreadCopyState) copyStream(ctx context.Context, stream int, writ // FIXME NewRepeatableReader is allocating - need to be more careful with the memory allocations // Also allocating for copy to local which doesn't need it - bytesWritten, err := writer.WriteChunk(stream, readers.NewRepeatableReader(rc)) + bytesWritten, err := writer.WriteChunk(ctx, stream, readers.NewRepeatableReader(rc)) if err != nil { return err } @@ -175,7 +175,7 @@ func multiThreadCopy(ctx context.Context, f fs.Fs, remote string, src fs.Object, } err = g.Wait() - closeErr := chunkWriter.Close() + closeErr := chunkWriter.Close(ctx) if err != nil { return nil, err } @@ -224,7 +224,6 @@ func (o *offsetWriter) Write(p []byte) (n int, err error) { // writerAtChunkWriter converts a WriterAtCloser into a ChunkWriter type writerAtChunkWriter struct { - ctx context.Context remote string size int64 writerAt fs.WriterAtCloser @@ -235,7 +234,7 @@ type writerAtChunkWriter struct { } // WriteChunk writes chunkNumber from reader -func (w writerAtChunkWriter) WriteChunk(chunkNumber int, reader io.ReadSeeker) (int64, error) { +func (w writerAtChunkWriter) WriteChunk(ctx context.Context, chunkNumber int, reader io.ReadSeeker) (int64, error) { fs.Debugf(w.remote, "writing chunk %v", chunkNumber) bytesToWrite := w.chunkSize @@ -266,17 +265,17 @@ func (w writerAtChunkWriter) WriteChunk(chunkNumber int, reader io.ReadSeeker) ( } // Close the chunk writing -func (w writerAtChunkWriter) Close() error { +func (w writerAtChunkWriter) Close(ctx context.Context) error { return w.writerAt.Close() } // Abort the chunk writing -func (w writerAtChunkWriter) Abort() error { - obj, err := w.f.NewObject(w.ctx, w.remote) +func (w writerAtChunkWriter) Abort(ctx context.Context) error { + obj, err := w.f.NewObject(ctx, w.remote) if err != nil { return fmt.Errorf("multi-thread copy: failed to find temp file when aborting chunk writer: %w", err) } - return obj.Remove(w.ctx) + return obj.Remove(ctx) } // openChunkWriterFromOpenWriterAt adapts an OpenWriterAtFn into an OpenChunkWriterFn using chunkSize and writeBufferSize @@ -292,7 +291,6 @@ func openChunkWriterFromOpenWriterAt(openWriterAt fs.OpenWriterAtFn, chunkSize i } chunkWriter := &writerAtChunkWriter{ - ctx: ctx, remote: remote, size: src.Size(), chunkSize: chunkSize, diff --git a/fstest/fstests/fstests.go b/fstest/fstests/fstests.go index 5ac7978cf..3d63290d8 100644 --- a/fstest/fstests/fstests.go +++ b/fstest/fstests/fstests.go @@ -809,17 +809,17 @@ func Run(t *testing.T, opt *Opt) { require.NoError(t, err) var n int64 - n, err = out.WriteChunk(1, strings.NewReader(contents2)) + n, err = out.WriteChunk(ctx, 1, strings.NewReader(contents2)) assert.NoError(t, err) assert.Equal(t, int64(size5MBs), n) - n, err = out.WriteChunk(2, strings.NewReader(contents3)) + n, err = out.WriteChunk(ctx, 2, strings.NewReader(contents3)) assert.NoError(t, err) assert.Equal(t, int64(size1MB), n) - n, err = out.WriteChunk(0, strings.NewReader(contents1)) + n, err = out.WriteChunk(ctx, 0, strings.NewReader(contents1)) assert.NoError(t, err) assert.Equal(t, int64(size5MBs), n) - assert.NoError(t, out.Close()) + assert.NoError(t, out.Close(ctx)) obj := findObject(ctx, t, f, path) originalContents := contents1 + contents2 + contents3