forked from TrueCloudLab/distribution
Merge pull request #419 from stevvooe/pool-buffers-s3
Pool buffers used in S3.WriteStream
This commit is contained in:
commit
73960f4024
1 changed files with 27 additions and 5 deletions
|
@ -23,6 +23,7 @@ import (
|
||||||
"reflect"
|
"reflect"
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/AdRoll/goamz/aws"
|
"github.com/AdRoll/goamz/aws"
|
||||||
|
@ -73,6 +74,9 @@ type driver struct {
|
||||||
ChunkSize int64
|
ChunkSize int64
|
||||||
Encrypt bool
|
Encrypt bool
|
||||||
RootDirectory string
|
RootDirectory string
|
||||||
|
|
||||||
|
pool sync.Pool // pool []byte buffers used for WriteStream
|
||||||
|
zeros []byte // shared, zero-valued buffer used for WriteStream
|
||||||
}
|
}
|
||||||
|
|
||||||
type baseEmbed struct {
|
type baseEmbed struct {
|
||||||
|
@ -239,6 +243,11 @@ func New(params DriverParameters) (*Driver, error) {
|
||||||
ChunkSize: params.ChunkSize,
|
ChunkSize: params.ChunkSize,
|
||||||
Encrypt: params.Encrypt,
|
Encrypt: params.Encrypt,
|
||||||
RootDirectory: params.RootDirectory,
|
RootDirectory: params.RootDirectory,
|
||||||
|
zeros: make([]byte, params.ChunkSize),
|
||||||
|
}
|
||||||
|
|
||||||
|
d.pool.New = func() interface{} {
|
||||||
|
return make([]byte, d.ChunkSize)
|
||||||
}
|
}
|
||||||
|
|
||||||
return &Driver{
|
return &Driver{
|
||||||
|
@ -302,8 +311,7 @@ func (d *driver) WriteStream(path string, offset int64, reader io.Reader) (total
|
||||||
return 0, err
|
return 0, err
|
||||||
}
|
}
|
||||||
|
|
||||||
buf := make([]byte, d.ChunkSize)
|
buf := d.getbuf()
|
||||||
zeroBuf := make([]byte, d.ChunkSize)
|
|
||||||
|
|
||||||
// We never want to leave a dangling multipart upload, our only consistent state is
|
// We never want to leave a dangling multipart upload, our only consistent state is
|
||||||
// when there is a whole object at path. This is in order to remain consistent with
|
// when there is a whole object at path. This is in order to remain consistent with
|
||||||
|
@ -329,6 +337,8 @@ func (d *driver) WriteStream(path string, offset int64, reader io.Reader) (total
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
d.putbuf(buf) // needs to be here to pick up new buf value
|
||||||
}()
|
}()
|
||||||
|
|
||||||
// Fills from 0 to total from current
|
// Fills from 0 to total from current
|
||||||
|
@ -382,6 +392,8 @@ func (d *driver) WriteStream(path string, offset int64, reader io.Reader) (total
|
||||||
}
|
}
|
||||||
|
|
||||||
go func(bytesRead int, from int64, buf []byte) {
|
go func(bytesRead int, from int64, buf []byte) {
|
||||||
|
defer d.putbuf(buf) // this buffer gets dropped after this call
|
||||||
|
|
||||||
// parts and partNumber are safe, because this function is the only one modifying them and we
|
// parts and partNumber are safe, because this function is the only one modifying them and we
|
||||||
// force it to be executed serially.
|
// force it to be executed serially.
|
||||||
if bytesRead > 0 {
|
if bytesRead > 0 {
|
||||||
|
@ -396,7 +408,7 @@ func (d *driver) WriteStream(path string, offset int64, reader io.Reader) (total
|
||||||
putErrChan <- nil
|
putErrChan <- nil
|
||||||
}(bytesRead, from, buf)
|
}(bytesRead, from, buf)
|
||||||
|
|
||||||
buf = make([]byte, d.ChunkSize)
|
buf = d.getbuf() // use a new buffer for the next call
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -444,7 +456,7 @@ func (d *driver) WriteStream(path string, offset int64, reader io.Reader) (total
|
||||||
fromZeroFillSmall := func(from, to int64) error {
|
fromZeroFillSmall := func(from, to int64) error {
|
||||||
bytesRead = 0
|
bytesRead = 0
|
||||||
for from+int64(bytesRead) < to {
|
for from+int64(bytesRead) < to {
|
||||||
nn, err := bytes.NewReader(zeroBuf).Read(buf[from+int64(bytesRead) : to])
|
nn, err := bytes.NewReader(d.zeros).Read(buf[from+int64(bytesRead) : to])
|
||||||
bytesRead += nn
|
bytesRead += nn
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
@ -458,7 +470,7 @@ func (d *driver) WriteStream(path string, offset int64, reader io.Reader) (total
|
||||||
fromZeroFillLarge := func(from, to int64) error {
|
fromZeroFillLarge := func(from, to int64) error {
|
||||||
bytesRead64 := int64(0)
|
bytesRead64 := int64(0)
|
||||||
for to-(from+bytesRead64) >= d.ChunkSize {
|
for to-(from+bytesRead64) >= d.ChunkSize {
|
||||||
part, err := multi.PutPart(int(partNumber), bytes.NewReader(zeroBuf))
|
part, err := multi.PutPart(int(partNumber), bytes.NewReader(d.zeros))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -739,3 +751,13 @@ func getPermissions() s3.ACL {
|
||||||
func (d *driver) getContentType() string {
|
func (d *driver) getContentType() string {
|
||||||
return "application/octet-stream"
|
return "application/octet-stream"
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// getbuf returns a buffer from the driver's pool with length d.ChunkSize.
|
||||||
|
func (d *driver) getbuf() []byte {
|
||||||
|
return d.pool.Get().([]byte)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *driver) putbuf(p []byte) {
|
||||||
|
copy(p, d.zeros)
|
||||||
|
d.pool.Put(p)
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in a new issue