From ac9cb50fdb9aa06342d58da61dd15687ade2e8d8 Mon Sep 17 00:00:00 2001 From: Maciej Zimnoch Date: Wed, 19 Feb 2020 11:17:25 +0100 Subject: [PATCH] backend/s3: use memory pool for buffer allocations Currently each multipart upload allocated his own buffers, which after file upload was garbaged. Next files couldn't leverage already allocated memory which resulted in inefficent memory management. This change introduces backend memory pool keeping memory chunks which can be used during object operations. Fixes #3967 --- backend/s3/s3.go | 96 ++++++++++++++++++++++++++++++++---------------- 1 file changed, 64 insertions(+), 32 deletions(-) diff --git a/backend/s3/s3.go b/backend/s3/s3.go index 94462b4bc..c76c21f89 100644 --- a/backend/s3/s3.go +++ b/backend/s3/s3.go @@ -56,6 +56,7 @@ import ( "github.com/rclone/rclone/lib/bucket" "github.com/rclone/rclone/lib/encoder" "github.com/rclone/rclone/lib/pacer" + "github.com/rclone/rclone/lib/pool" "github.com/rclone/rclone/lib/readers" "github.com/rclone/rclone/lib/rest" "golang.org/x/sync/errgroup" @@ -841,11 +842,24 @@ In Ceph, this can be increased with the "rgw list buckets max chunk" option. // - doubled / encoding // - trailing / encoding // so that AWS keys are always valid file names - Default: (encoder.EncodeInvalidUtf8 | + Default: encoder.EncodeInvalidUtf8 | encoder.EncodeSlash | - encoder.EncodeDot), - }}, - }) + encoder.EncodeDot, + }, { + Name: "memory_pool_flush_time", + Default: memoryPoolFlushTime, + Advanced: true, + Help: `How often internal memory buffer pools will be flushed. + +Uploads which requires additional buffers (f.e multipart) will use memory pool for allocations. +This option controls how often unused buffers will be removed from the pool.`, + }, { + Name: "memory_pool_use_mmap", + Default: memoryPoolUseMmap, + Advanced: true, + Help: `Whether to use mmap buffers in internal memory pool.`, + }, + }}) } // Constants @@ -859,6 +873,9 @@ const ( defaultUploadCutoff = fs.SizeSuffix(200 * 1024 * 1024) maxUploadCutoff = fs.SizeSuffix(5 * 1024 * 1024 * 1024) minSleep = 10 * time.Millisecond // In case of error, start at 10ms sleep. + + memoryPoolFlushTime = fs.Duration(time.Minute) // flush the cached buffers after this long + memoryPoolUseMmap = false ) // Options defines the configuration for this backend @@ -887,21 +904,25 @@ type Options struct { LeavePartsOnError bool `config:"leave_parts_on_error"` ListChunk int64 `config:"list_chunk"` Enc encoder.MultiEncoder `config:"encoding"` + MemoryPoolFlushTime fs.Duration `config:"memory_pool_flush_time"` + MemoryPoolUseMmap bool `config:"memory_pool_use_mmap"` } // Fs represents a remote s3 server type Fs struct { - name string // the name of the remote - root string // root of the bucket - ignore all objects above this - opt Options // parsed options - features *fs.Features // optional features - c *s3.S3 // the connection to the s3 server - ses *session.Session // the s3 session - rootBucket string // bucket part of root (if any) - rootDirectory string // directory part of root (if any) - cache *bucket.Cache // cache for bucket creation status - pacer *fs.Pacer // To pace the API calls - srv *http.Client // a plain http client + name string // the name of the remote + root string // root of the bucket - ignore all objects above this + opt Options // parsed options + features *fs.Features // optional features + c *s3.S3 // the connection to the s3 server + ses *session.Session // the s3 session + rootBucket string // bucket part of root (if any) + rootDirectory string // directory part of root (if any) + cache *bucket.Cache // cache for bucket creation status + pacer *fs.Pacer // To pace the API calls + srv *http.Client // a plain http client + poolMu sync.Mutex // mutex protecting memory pools map + pools map[int64]*pool.Pool // memory pools } // Object describes a s3 object @@ -1180,6 +1201,7 @@ func NewFs(name, root string, m configmap.Mapper) (fs.Fs, error) { if err != nil { return nil, err } + f := &Fs{ name: name, opt: *opt, @@ -1188,7 +1210,9 @@ func NewFs(name, root string, m configmap.Mapper) (fs.Fs, error) { pacer: fs.NewPacer(pacer.NewS3(pacer.MinSleep(minSleep))), cache: bucket.NewCache(), srv: fshttp.NewClient(fs.Config), + pools: make(map[int64]*pool.Pool), } + f.setRoot(root) f.features = (&fs.Features{ ReadMimeType: true, @@ -1875,6 +1899,22 @@ func (f *Fs) Hashes() hash.Set { return hash.Set(hash.MD5) } +func (f *Fs) getMemoryPool(size int64) *pool.Pool { + f.poolMu.Lock() + defer f.poolMu.Unlock() + + _, ok := f.pools[size] + if !ok { + f.pools[size] = pool.New( + time.Duration(f.opt.MemoryPoolFlushTime), + int(f.opt.ChunkSize), + f.opt.UploadConcurrency*fs.Config.Transfers, + f.opt.MemoryPoolUseMmap, + ) + } + return f.pools[size] +} + // ------------------------------------------------------------ // Fs returns the parent Fs @@ -2078,16 +2118,7 @@ func (o *Object) uploadMultipart(ctx context.Context, req *s3.PutObjectInput, si if concurrency < 1 { concurrency = 1 } - bufs := make(chan []byte, concurrency) - defer func() { - // empty the channel on exit - close(bufs) - for range bufs { - } - }() - for i := 0; i < concurrency; i++ { - bufs <- nil - } + tokens := pacer.NewTokenDispenser(concurrency) // calculate size of parts partSize := int(f.opt.ChunkSize) @@ -2108,6 +2139,8 @@ func (o *Object) uploadMultipart(ctx context.Context, req *s3.PutObjectInput, si } } + memPool := f.getMemoryPool(int64(partSize)) + var cout *s3.CreateMultipartUploadOutput err = f.pacer.Call(func() (bool, error) { var err error @@ -2159,11 +2192,9 @@ func (o *Object) uploadMultipart(ctx context.Context, req *s3.PutObjectInput, si ) for partNum := int64(1); !finished; partNum++ { - // Get a block of memory from the channel (which limits concurrency) - buf := <-bufs - if buf == nil { - buf = make([]byte, partSize) - } + // Get a block of memory from the pool and token which limits concurrency. + tokens.Get() + buf := memPool.Get() // Fail fast, in case an errgroup managed function returns an error // gCtx is cancelled. There is no point in uploading all the other parts. @@ -2226,8 +2257,9 @@ func (o *Object) uploadMultipart(ctx context.Context, req *s3.PutObjectInput, si return false, nil }) - // return the memory - bufs <- buf[:partSize] + // return the memory and token + memPool.Put(buf[:partSize]) + tokens.Put() if err != nil { return errors.Wrap(err, "multipart upload failed to upload part")