diff --git a/storagedriver/s3/s3.go b/storagedriver/s3/s3.go index c63b1d6b..2c3ec16b 100644 --- a/storagedriver/s3/s3.go +++ b/storagedriver/s3/s3.go @@ -284,6 +284,7 @@ func (d *Driver) WriteStream(path string, offset int64, reader io.Reader) (total partNumber := 1 bytesRead := 0 + var putErrChan chan error parts := []s3.Part{} var part s3.Part @@ -303,6 +304,12 @@ func (d *Driver) WriteStream(path string, offset int64, reader io.Reader) (total // multipart upload, which will eventually be cleaned up, but we will lose all of the progress // made prior to the machine crashing. defer func() { + if putErrChan != nil { + if putErr := <-putErrChan; putErr != nil { + err = putErr + } + } + if len(parts) > 0 { if multi == nil { // Parts should be empty if the multi is not initialized @@ -356,16 +363,31 @@ func (d *Driver) WriteStream(path string, offset int64, reader io.Reader) (total } } - if bytesRead > 0 { - part, err = multi.PutPart(int(partNumber), bytes.NewReader(buf[0:int64(bytesRead)+from])) - if err != nil { - return err + if putErrChan == nil { + putErrChan = make(chan error) + } else { + if putErr := <-putErrChan; putErr != nil { + putErrChan = nil + return putErr } - - parts = append(parts, part) - partNumber++ } + go func(bytesRead int, from int64, buf []byte) { + // parts and partNumber are safe, because this function is the only one modifying them and we + // force it to be executed serially. + if bytesRead > 0 { + part, putErr := multi.PutPart(int(partNumber), bytes.NewReader(buf[0:int64(bytesRead)+from])) + if putErr != nil { + putErrChan <- putErr + } + + parts = append(parts, part) + partNumber++ + } + putErrChan <- nil + }(bytesRead, from, buf) + + buf = make([]byte, d.ChunkSize) return nil }