fs: add context.Ctx to ChunkWriter methods
WriteChunk in particular needs a different context from that which OpenChunkWriter was used with so add it to all the methods.
This commit is contained in:
parent
f3bd02f0ef
commit
0d0bcdac31
4 changed files with 20 additions and 22 deletions
|
@ -5426,7 +5426,7 @@ func (w *s3ChunkWriter) addMd5(md5binary *[]byte, chunkNumber int64) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// WriteChunk will write chunk number with reader bytes, where chunk number >= 0
|
// WriteChunk will write chunk number with reader bytes, where chunk number >= 0
|
||||||
func (w *s3ChunkWriter) WriteChunk(chunkNumber int, reader io.ReadSeeker) (int64, error) {
|
func (w *s3ChunkWriter) WriteChunk(ctx context.Context, chunkNumber int, reader io.ReadSeeker) (int64, error) {
|
||||||
if chunkNumber < 0 {
|
if chunkNumber < 0 {
|
||||||
err := fmt.Errorf("invalid chunk number provided: %v", chunkNumber)
|
err := fmt.Errorf("invalid chunk number provided: %v", chunkNumber)
|
||||||
return -1, err
|
return -1, err
|
||||||
|
@ -5488,7 +5488,7 @@ func (w *s3ChunkWriter) WriteChunk(chunkNumber int, reader io.ReadSeeker) (int64
|
||||||
}
|
}
|
||||||
|
|
||||||
// Abort the multpart upload
|
// Abort the multpart upload
|
||||||
func (w *s3ChunkWriter) Abort() error {
|
func (w *s3ChunkWriter) Abort(ctx context.Context) error {
|
||||||
err := w.f.pacer.Call(func() (bool, error) {
|
err := w.f.pacer.Call(func() (bool, error) {
|
||||||
_, err := w.f.c.AbortMultipartUploadWithContext(context.Background(), &s3.AbortMultipartUploadInput{
|
_, err := w.f.c.AbortMultipartUploadWithContext(context.Background(), &s3.AbortMultipartUploadInput{
|
||||||
Bucket: w.bucket,
|
Bucket: w.bucket,
|
||||||
|
@ -5506,7 +5506,7 @@ func (w *s3ChunkWriter) Abort() error {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Close and finalise the multipart upload
|
// Close and finalise the multipart upload
|
||||||
func (w *s3ChunkWriter) Close() (err error) {
|
func (w *s3ChunkWriter) Close(ctx context.Context) (err error) {
|
||||||
// sort the completed parts by part number
|
// sort the completed parts by part number
|
||||||
sort.Slice(w.completedParts, func(i, j int) bool {
|
sort.Slice(w.completedParts, func(i, j int) bool {
|
||||||
return *w.completedParts[i].PartNumber < *w.completedParts[j].PartNumber
|
return *w.completedParts[i].PartNumber < *w.completedParts[j].PartNumber
|
||||||
|
@ -5561,7 +5561,7 @@ func (o *Object) uploadMultipart(ctx context.Context, src fs.ObjectInfo, in io.R
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
fs.Debugf(o, "Cancelling multipart upload")
|
fs.Debugf(o, "Cancelling multipart upload")
|
||||||
errCancel := chunkWriter.Abort()
|
errCancel := chunkWriter.Abort(ctx)
|
||||||
if errCancel != nil {
|
if errCancel != nil {
|
||||||
fs.Debugf(o, "Failed to cancel multipart upload: %v", errCancel)
|
fs.Debugf(o, "Failed to cancel multipart upload: %v", errCancel)
|
||||||
}
|
}
|
||||||
|
@ -5611,7 +5611,7 @@ func (o *Object) uploadMultipart(ctx context.Context, src fs.ObjectInfo, in io.R
|
||||||
off += int64(n)
|
off += int64(n)
|
||||||
g.Go(func() (err error) {
|
g.Go(func() (err error) {
|
||||||
defer free()
|
defer free()
|
||||||
_, err = chunkWriter.WriteChunk(int(partNum), bytes.NewReader(buf))
|
_, err = chunkWriter.WriteChunk(gCtx, int(partNum), bytes.NewReader(buf))
|
||||||
return err
|
return err
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
@ -5620,7 +5620,7 @@ func (o *Object) uploadMultipart(ctx context.Context, src fs.ObjectInfo, in io.R
|
||||||
return wantETag, gotETag, nil, err
|
return wantETag, gotETag, nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
err = chunkWriter.Close()
|
err = chunkWriter.Close(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return wantETag, gotETag, nil, fmt.Errorf("multipart upload failed to finalise: %w", err)
|
return wantETag, gotETag, nil, fmt.Errorf("multipart upload failed to finalise: %w", err)
|
||||||
}
|
}
|
||||||
|
|
|
@ -652,13 +652,13 @@ type OpenChunkWriterFn func(ctx context.Context, remote string, src ObjectInfo,
|
||||||
|
|
||||||
type ChunkWriter interface {
|
type ChunkWriter interface {
|
||||||
// WriteChunk will write chunk number with reader bytes, where chunk number >= 0
|
// WriteChunk will write chunk number with reader bytes, where chunk number >= 0
|
||||||
WriteChunk(chunkNumber int, reader io.ReadSeeker) (bytesWritten int64, err error)
|
WriteChunk(ctx context.Context, chunkNumber int, reader io.ReadSeeker) (bytesWritten int64, err error)
|
||||||
|
|
||||||
// Close complete chunked writer
|
// Close complete chunked writer
|
||||||
Close() error
|
Close(ctx context.Context) error
|
||||||
|
|
||||||
// Abort chunk write
|
// Abort chunk write
|
||||||
Abort() error
|
Abort(ctx context.Context) error
|
||||||
}
|
}
|
||||||
|
|
||||||
// UserInfoer is an optional interface for Fs
|
// UserInfoer is an optional interface for Fs
|
||||||
|
|
|
@ -86,7 +86,7 @@ func (mc *multiThreadCopyState) copyStream(ctx context.Context, stream int, writ
|
||||||
|
|
||||||
// FIXME NewRepeatableReader is allocating - need to be more careful with the memory allocations
|
// FIXME NewRepeatableReader is allocating - need to be more careful with the memory allocations
|
||||||
// Also allocating for copy to local which doesn't need it
|
// Also allocating for copy to local which doesn't need it
|
||||||
bytesWritten, err := writer.WriteChunk(stream, readers.NewRepeatableReader(rc))
|
bytesWritten, err := writer.WriteChunk(ctx, stream, readers.NewRepeatableReader(rc))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -175,7 +175,7 @@ func multiThreadCopy(ctx context.Context, f fs.Fs, remote string, src fs.Object,
|
||||||
}
|
}
|
||||||
|
|
||||||
err = g.Wait()
|
err = g.Wait()
|
||||||
closeErr := chunkWriter.Close()
|
closeErr := chunkWriter.Close(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -224,7 +224,6 @@ func (o *offsetWriter) Write(p []byte) (n int, err error) {
|
||||||
|
|
||||||
// writerAtChunkWriter converts a WriterAtCloser into a ChunkWriter
|
// writerAtChunkWriter converts a WriterAtCloser into a ChunkWriter
|
||||||
type writerAtChunkWriter struct {
|
type writerAtChunkWriter struct {
|
||||||
ctx context.Context
|
|
||||||
remote string
|
remote string
|
||||||
size int64
|
size int64
|
||||||
writerAt fs.WriterAtCloser
|
writerAt fs.WriterAtCloser
|
||||||
|
@ -235,7 +234,7 @@ type writerAtChunkWriter struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
// WriteChunk writes chunkNumber from reader
|
// WriteChunk writes chunkNumber from reader
|
||||||
func (w writerAtChunkWriter) WriteChunk(chunkNumber int, reader io.ReadSeeker) (int64, error) {
|
func (w writerAtChunkWriter) WriteChunk(ctx context.Context, chunkNumber int, reader io.ReadSeeker) (int64, error) {
|
||||||
fs.Debugf(w.remote, "writing chunk %v", chunkNumber)
|
fs.Debugf(w.remote, "writing chunk %v", chunkNumber)
|
||||||
|
|
||||||
bytesToWrite := w.chunkSize
|
bytesToWrite := w.chunkSize
|
||||||
|
@ -266,17 +265,17 @@ func (w writerAtChunkWriter) WriteChunk(chunkNumber int, reader io.ReadSeeker) (
|
||||||
}
|
}
|
||||||
|
|
||||||
// Close the chunk writing
|
// Close the chunk writing
|
||||||
func (w writerAtChunkWriter) Close() error {
|
func (w writerAtChunkWriter) Close(ctx context.Context) error {
|
||||||
return w.writerAt.Close()
|
return w.writerAt.Close()
|
||||||
}
|
}
|
||||||
|
|
||||||
// Abort the chunk writing
|
// Abort the chunk writing
|
||||||
func (w writerAtChunkWriter) Abort() error {
|
func (w writerAtChunkWriter) Abort(ctx context.Context) error {
|
||||||
obj, err := w.f.NewObject(w.ctx, w.remote)
|
obj, err := w.f.NewObject(ctx, w.remote)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("multi-thread copy: failed to find temp file when aborting chunk writer: %w", err)
|
return fmt.Errorf("multi-thread copy: failed to find temp file when aborting chunk writer: %w", err)
|
||||||
}
|
}
|
||||||
return obj.Remove(w.ctx)
|
return obj.Remove(ctx)
|
||||||
}
|
}
|
||||||
|
|
||||||
// openChunkWriterFromOpenWriterAt adapts an OpenWriterAtFn into an OpenChunkWriterFn using chunkSize and writeBufferSize
|
// openChunkWriterFromOpenWriterAt adapts an OpenWriterAtFn into an OpenChunkWriterFn using chunkSize and writeBufferSize
|
||||||
|
@ -292,7 +291,6 @@ func openChunkWriterFromOpenWriterAt(openWriterAt fs.OpenWriterAtFn, chunkSize i
|
||||||
}
|
}
|
||||||
|
|
||||||
chunkWriter := &writerAtChunkWriter{
|
chunkWriter := &writerAtChunkWriter{
|
||||||
ctx: ctx,
|
|
||||||
remote: remote,
|
remote: remote,
|
||||||
size: src.Size(),
|
size: src.Size(),
|
||||||
chunkSize: chunkSize,
|
chunkSize: chunkSize,
|
||||||
|
|
|
@ -809,17 +809,17 @@ func Run(t *testing.T, opt *Opt) {
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
var n int64
|
var n int64
|
||||||
n, err = out.WriteChunk(1, strings.NewReader(contents2))
|
n, err = out.WriteChunk(ctx, 1, strings.NewReader(contents2))
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
assert.Equal(t, int64(size5MBs), n)
|
assert.Equal(t, int64(size5MBs), n)
|
||||||
n, err = out.WriteChunk(2, strings.NewReader(contents3))
|
n, err = out.WriteChunk(ctx, 2, strings.NewReader(contents3))
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
assert.Equal(t, int64(size1MB), n)
|
assert.Equal(t, int64(size1MB), n)
|
||||||
n, err = out.WriteChunk(0, strings.NewReader(contents1))
|
n, err = out.WriteChunk(ctx, 0, strings.NewReader(contents1))
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
assert.Equal(t, int64(size5MBs), n)
|
assert.Equal(t, int64(size5MBs), n)
|
||||||
|
|
||||||
assert.NoError(t, out.Close())
|
assert.NoError(t, out.Close(ctx))
|
||||||
|
|
||||||
obj := findObject(ctx, t, f, path)
|
obj := findObject(ctx, t, f, path)
|
||||||
originalContents := contents1 + contents2 + contents3
|
originalContents := contents1 + contents2 + contents3
|
||||||
|
|
Loading…
Reference in a new issue