Add a rotating buffer functionality to the s3 driver

This commit is contained in:
Andrey Kostov 2015-01-26 17:51:59 -08:00
parent f0e0a080e9
commit c05e4682f8

View file

@ -284,6 +284,7 @@ func (d *Driver) WriteStream(path string, offset int64, reader io.Reader) (total
partNumber := 1 partNumber := 1
bytesRead := 0 bytesRead := 0
var putErrChan chan error
parts := []s3.Part{} parts := []s3.Part{}
var part s3.Part var part s3.Part
@ -303,6 +304,12 @@ func (d *Driver) WriteStream(path string, offset int64, reader io.Reader) (total
// multipart upload, which will eventually be cleaned up, but we will lose all of the progress // multipart upload, which will eventually be cleaned up, but we will lose all of the progress
// made prior to the machine crashing. // made prior to the machine crashing.
defer func() { defer func() {
if putErrChan != nil {
if putErr := <-putErrChan; putErr != nil {
err = putErr
}
}
if len(parts) > 0 { if len(parts) > 0 {
if multi == nil { if multi == nil {
// Parts should be empty if the multi is not initialized // Parts should be empty if the multi is not initialized
@ -356,16 +363,31 @@ func (d *Driver) WriteStream(path string, offset int64, reader io.Reader) (total
} }
} }
if putErrChan == nil {
putErrChan = make(chan error)
} else {
if putErr := <-putErrChan; putErr != nil {
putErrChan = nil
return putErr
}
}
go func(bytesRead int, from int64, buf []byte) {
// parts and partNumber are safe, because this function is the only one modifying them and we
// force it to be executed serially.
if bytesRead > 0 { if bytesRead > 0 {
part, err = multi.PutPart(int(partNumber), bytes.NewReader(buf[0:int64(bytesRead)+from])) part, putErr := multi.PutPart(int(partNumber), bytes.NewReader(buf[0:int64(bytesRead)+from]))
if err != nil { if putErr != nil {
return err putErrChan <- putErr
} }
parts = append(parts, part) parts = append(parts, part)
partNumber++ partNumber++
} }
putErrChan <- nil
}(bytesRead, from, buf)
buf = make([]byte, d.ChunkSize)
return nil return nil
} }