backend/s3: use memory pool for buffer allocations

Currently each multipart upload allocated his own buffers, which after
file upload was garbaged. Next files couldn't leverage already allocated
memory which resulted in inefficent memory management. This change
introduces backend memory pool keeping memory chunks which can be
used during object operations.

Fixes #3967
This commit is contained in:
Maciej Zimnoch 2020-02-19 11:17:25 +01:00 committed by Nick Craig-Wood
parent 4a8b548add
commit ac9cb50fdb

View file

@ -56,6 +56,7 @@ import (
"github.com/rclone/rclone/lib/bucket" "github.com/rclone/rclone/lib/bucket"
"github.com/rclone/rclone/lib/encoder" "github.com/rclone/rclone/lib/encoder"
"github.com/rclone/rclone/lib/pacer" "github.com/rclone/rclone/lib/pacer"
"github.com/rclone/rclone/lib/pool"
"github.com/rclone/rclone/lib/readers" "github.com/rclone/rclone/lib/readers"
"github.com/rclone/rclone/lib/rest" "github.com/rclone/rclone/lib/rest"
"golang.org/x/sync/errgroup" "golang.org/x/sync/errgroup"
@ -841,11 +842,24 @@ In Ceph, this can be increased with the "rgw list buckets max chunk" option.
// - doubled / encoding // - doubled / encoding
// - trailing / encoding // - trailing / encoding
// so that AWS keys are always valid file names // so that AWS keys are always valid file names
Default: (encoder.EncodeInvalidUtf8 | Default: encoder.EncodeInvalidUtf8 |
encoder.EncodeSlash | encoder.EncodeSlash |
encoder.EncodeDot), encoder.EncodeDot,
}}, }, {
}) Name: "memory_pool_flush_time",
Default: memoryPoolFlushTime,
Advanced: true,
Help: `How often internal memory buffer pools will be flushed.
Uploads which requires additional buffers (f.e multipart) will use memory pool for allocations.
This option controls how often unused buffers will be removed from the pool.`,
}, {
Name: "memory_pool_use_mmap",
Default: memoryPoolUseMmap,
Advanced: true,
Help: `Whether to use mmap buffers in internal memory pool.`,
},
}})
} }
// Constants // Constants
@ -859,6 +873,9 @@ const (
defaultUploadCutoff = fs.SizeSuffix(200 * 1024 * 1024) defaultUploadCutoff = fs.SizeSuffix(200 * 1024 * 1024)
maxUploadCutoff = fs.SizeSuffix(5 * 1024 * 1024 * 1024) maxUploadCutoff = fs.SizeSuffix(5 * 1024 * 1024 * 1024)
minSleep = 10 * time.Millisecond // In case of error, start at 10ms sleep. minSleep = 10 * time.Millisecond // In case of error, start at 10ms sleep.
memoryPoolFlushTime = fs.Duration(time.Minute) // flush the cached buffers after this long
memoryPoolUseMmap = false
) )
// Options defines the configuration for this backend // Options defines the configuration for this backend
@ -887,21 +904,25 @@ type Options struct {
LeavePartsOnError bool `config:"leave_parts_on_error"` LeavePartsOnError bool `config:"leave_parts_on_error"`
ListChunk int64 `config:"list_chunk"` ListChunk int64 `config:"list_chunk"`
Enc encoder.MultiEncoder `config:"encoding"` Enc encoder.MultiEncoder `config:"encoding"`
MemoryPoolFlushTime fs.Duration `config:"memory_pool_flush_time"`
MemoryPoolUseMmap bool `config:"memory_pool_use_mmap"`
} }
// Fs represents a remote s3 server // Fs represents a remote s3 server
type Fs struct { type Fs struct {
name string // the name of the remote name string // the name of the remote
root string // root of the bucket - ignore all objects above this root string // root of the bucket - ignore all objects above this
opt Options // parsed options opt Options // parsed options
features *fs.Features // optional features features *fs.Features // optional features
c *s3.S3 // the connection to the s3 server c *s3.S3 // the connection to the s3 server
ses *session.Session // the s3 session ses *session.Session // the s3 session
rootBucket string // bucket part of root (if any) rootBucket string // bucket part of root (if any)
rootDirectory string // directory part of root (if any) rootDirectory string // directory part of root (if any)
cache *bucket.Cache // cache for bucket creation status cache *bucket.Cache // cache for bucket creation status
pacer *fs.Pacer // To pace the API calls pacer *fs.Pacer // To pace the API calls
srv *http.Client // a plain http client srv *http.Client // a plain http client
poolMu sync.Mutex // mutex protecting memory pools map
pools map[int64]*pool.Pool // memory pools
} }
// Object describes a s3 object // Object describes a s3 object
@ -1180,6 +1201,7 @@ func NewFs(name, root string, m configmap.Mapper) (fs.Fs, error) {
if err != nil { if err != nil {
return nil, err return nil, err
} }
f := &Fs{ f := &Fs{
name: name, name: name,
opt: *opt, opt: *opt,
@ -1188,7 +1210,9 @@ func NewFs(name, root string, m configmap.Mapper) (fs.Fs, error) {
pacer: fs.NewPacer(pacer.NewS3(pacer.MinSleep(minSleep))), pacer: fs.NewPacer(pacer.NewS3(pacer.MinSleep(minSleep))),
cache: bucket.NewCache(), cache: bucket.NewCache(),
srv: fshttp.NewClient(fs.Config), srv: fshttp.NewClient(fs.Config),
pools: make(map[int64]*pool.Pool),
} }
f.setRoot(root) f.setRoot(root)
f.features = (&fs.Features{ f.features = (&fs.Features{
ReadMimeType: true, ReadMimeType: true,
@ -1875,6 +1899,22 @@ func (f *Fs) Hashes() hash.Set {
return hash.Set(hash.MD5) return hash.Set(hash.MD5)
} }
func (f *Fs) getMemoryPool(size int64) *pool.Pool {
f.poolMu.Lock()
defer f.poolMu.Unlock()
_, ok := f.pools[size]
if !ok {
f.pools[size] = pool.New(
time.Duration(f.opt.MemoryPoolFlushTime),
int(f.opt.ChunkSize),
f.opt.UploadConcurrency*fs.Config.Transfers,
f.opt.MemoryPoolUseMmap,
)
}
return f.pools[size]
}
// ------------------------------------------------------------ // ------------------------------------------------------------
// Fs returns the parent Fs // Fs returns the parent Fs
@ -2078,16 +2118,7 @@ func (o *Object) uploadMultipart(ctx context.Context, req *s3.PutObjectInput, si
if concurrency < 1 { if concurrency < 1 {
concurrency = 1 concurrency = 1
} }
bufs := make(chan []byte, concurrency) tokens := pacer.NewTokenDispenser(concurrency)
defer func() {
// empty the channel on exit
close(bufs)
for range bufs {
}
}()
for i := 0; i < concurrency; i++ {
bufs <- nil
}
// calculate size of parts // calculate size of parts
partSize := int(f.opt.ChunkSize) partSize := int(f.opt.ChunkSize)
@ -2108,6 +2139,8 @@ func (o *Object) uploadMultipart(ctx context.Context, req *s3.PutObjectInput, si
} }
} }
memPool := f.getMemoryPool(int64(partSize))
var cout *s3.CreateMultipartUploadOutput var cout *s3.CreateMultipartUploadOutput
err = f.pacer.Call(func() (bool, error) { err = f.pacer.Call(func() (bool, error) {
var err error var err error
@ -2159,11 +2192,9 @@ func (o *Object) uploadMultipart(ctx context.Context, req *s3.PutObjectInput, si
) )
for partNum := int64(1); !finished; partNum++ { for partNum := int64(1); !finished; partNum++ {
// Get a block of memory from the channel (which limits concurrency) // Get a block of memory from the pool and token which limits concurrency.
buf := <-bufs tokens.Get()
if buf == nil { buf := memPool.Get()
buf = make([]byte, partSize)
}
// Fail fast, in case an errgroup managed function returns an error // Fail fast, in case an errgroup managed function returns an error
// gCtx is cancelled. There is no point in uploading all the other parts. // gCtx is cancelled. There is no point in uploading all the other parts.
@ -2226,8 +2257,9 @@ func (o *Object) uploadMultipart(ctx context.Context, req *s3.PutObjectInput, si
return false, nil return false, nil
}) })
// return the memory // return the memory and token
bufs <- buf[:partSize] memPool.Put(buf[:partSize])
tokens.Put()
if err != nil { if err != nil {
return errors.Wrap(err, "multipart upload failed to upload part") return errors.Wrap(err, "multipart upload failed to upload part")