Pool buffers used in S3.WriteStream

Signed-off-by: Stephen J Day <stephen.day@docker.com>
This commit is contained in:
Stephen J Day 2015-04-22 15:07:18 -07:00 committed by Richard
parent af0c2625e0
commit f3443f8f64

View file

@ -23,6 +23,7 @@ import (
"reflect" "reflect"
"strconv" "strconv"
"strings" "strings"
"sync"
"time" "time"
"github.com/AdRoll/goamz/aws" "github.com/AdRoll/goamz/aws"
@ -73,6 +74,9 @@ type driver struct {
ChunkSize int64 ChunkSize int64
Encrypt bool Encrypt bool
RootDirectory string RootDirectory string
pool sync.Pool // pool []byte buffers used for WriteStream
zeros []byte // shared, zero-valued buffer used for WriteStream
} }
type baseEmbed struct { type baseEmbed struct {
@ -239,6 +243,11 @@ func New(params DriverParameters) (*Driver, error) {
ChunkSize: params.ChunkSize, ChunkSize: params.ChunkSize,
Encrypt: params.Encrypt, Encrypt: params.Encrypt,
RootDirectory: params.RootDirectory, RootDirectory: params.RootDirectory,
zeros: make([]byte, params.ChunkSize),
}
d.pool.New = func() interface{} {
return make([]byte, d.ChunkSize)
} }
return &Driver{ return &Driver{
@ -302,8 +311,7 @@ func (d *driver) WriteStream(path string, offset int64, reader io.Reader) (total
return 0, err return 0, err
} }
buf := make([]byte, d.ChunkSize) buf := d.getbuf()
zeroBuf := make([]byte, d.ChunkSize)
// We never want to leave a dangling multipart upload, our only consistent state is // We never want to leave a dangling multipart upload, our only consistent state is
// when there is a whole object at path. This is in order to remain consistent with // when there is a whole object at path. This is in order to remain consistent with
@ -329,6 +337,8 @@ func (d *driver) WriteStream(path string, offset int64, reader io.Reader) (total
} }
} }
} }
d.putbuf(buf) // needs to be here to pick up new buf value
}() }()
// Fills from 0 to total from current // Fills from 0 to total from current
@ -382,6 +392,8 @@ func (d *driver) WriteStream(path string, offset int64, reader io.Reader) (total
} }
go func(bytesRead int, from int64, buf []byte) { go func(bytesRead int, from int64, buf []byte) {
defer d.putbuf(buf) // this buffer gets dropped after this call
// parts and partNumber are safe, because this function is the only one modifying them and we // parts and partNumber are safe, because this function is the only one modifying them and we
// force it to be executed serially. // force it to be executed serially.
if bytesRead > 0 { if bytesRead > 0 {
@ -396,7 +408,7 @@ func (d *driver) WriteStream(path string, offset int64, reader io.Reader) (total
putErrChan <- nil putErrChan <- nil
}(bytesRead, from, buf) }(bytesRead, from, buf)
buf = make([]byte, d.ChunkSize) buf = d.getbuf() // use a new buffer for the next call
return nil return nil
} }
@ -444,7 +456,7 @@ func (d *driver) WriteStream(path string, offset int64, reader io.Reader) (total
fromZeroFillSmall := func(from, to int64) error { fromZeroFillSmall := func(from, to int64) error {
bytesRead = 0 bytesRead = 0
for from+int64(bytesRead) < to { for from+int64(bytesRead) < to {
nn, err := bytes.NewReader(zeroBuf).Read(buf[from+int64(bytesRead) : to]) nn, err := bytes.NewReader(d.zeros).Read(buf[from+int64(bytesRead) : to])
bytesRead += nn bytesRead += nn
if err != nil { if err != nil {
return err return err
@ -458,7 +470,7 @@ func (d *driver) WriteStream(path string, offset int64, reader io.Reader) (total
fromZeroFillLarge := func(from, to int64) error { fromZeroFillLarge := func(from, to int64) error {
bytesRead64 := int64(0) bytesRead64 := int64(0)
for to-(from+bytesRead64) >= d.ChunkSize { for to-(from+bytesRead64) >= d.ChunkSize {
part, err := multi.PutPart(int(partNumber), bytes.NewReader(zeroBuf)) part, err := multi.PutPart(int(partNumber), bytes.NewReader(d.zeros))
if err != nil { if err != nil {
return err return err
} }
@ -739,3 +751,13 @@ func getPermissions() s3.ACL {
func (d *driver) getContentType() string { func (d *driver) getContentType() string {
return "application/octet-stream" return "application/octet-stream"
} }
// getbuf returns a buffer from the driver's pool with length d.ChunkSize.
func (d *driver) getbuf() []byte {
return d.pool.Get().([]byte)
}
func (d *driver) putbuf(p []byte) {
copy(p, d.zeros)
d.pool.Put(p)
}