Merge pull request #109 from AndreyKostov/storagedriver-s3-rotating-buffer

Add a rotating buffer functionality to the s3 driver
pull/77/head
Olivier Gambier 2015-01-27 14:27:24 -08:00
commit 769df8dabe
1 changed files with 29 additions and 7 deletions

View File

@ -284,6 +284,7 @@ func (d *Driver) WriteStream(path string, offset int64, reader io.Reader) (total
partNumber := 1
bytesRead := 0
var putErrChan chan error
parts := []s3.Part{}
var part s3.Part
@ -303,6 +304,12 @@ func (d *Driver) WriteStream(path string, offset int64, reader io.Reader) (total
// multipart upload, which will eventually be cleaned up, but we will lose all of the progress
// made prior to the machine crashing.
defer func() {
if putErrChan != nil {
if putErr := <-putErrChan; putErr != nil {
err = putErr
}
}
if len(parts) > 0 {
if multi == nil {
// Parts should be empty if the multi is not initialized
@ -356,16 +363,31 @@ func (d *Driver) WriteStream(path string, offset int64, reader io.Reader) (total
}
}
if bytesRead > 0 {
part, err = multi.PutPart(int(partNumber), bytes.NewReader(buf[0:int64(bytesRead)+from]))
if err != nil {
return err
if putErrChan == nil {
putErrChan = make(chan error)
} else {
if putErr := <-putErrChan; putErr != nil {
putErrChan = nil
return putErr
}
parts = append(parts, part)
partNumber++
}
go func(bytesRead int, from int64, buf []byte) {
// parts and partNumber are safe, because this function is the only one modifying them and we
// force it to be executed serially.
if bytesRead > 0 {
part, putErr := multi.PutPart(int(partNumber), bytes.NewReader(buf[0:int64(bytesRead)+from]))
if putErr != nil {
putErrChan <- putErr
}
parts = append(parts, part)
partNumber++
}
putErrChan <- nil
}(bytesRead, from, buf)
buf = make([]byte, d.ChunkSize)
return nil
}