azureblob: implement memory pooling to control memory use

This commit implements memory pooling to control excessive memory use
as was implemented in the s3 backend.
This commit is contained in:
Nick Craig-Wood 2020-04-25 18:36:18 +01:00
parent ee7219aa20
commit 37a53570d4

View file

@ -35,6 +35,7 @@ import (
"github.com/rclone/rclone/lib/bucket" "github.com/rclone/rclone/lib/bucket"
"github.com/rclone/rclone/lib/encoder" "github.com/rclone/rclone/lib/encoder"
"github.com/rclone/rclone/lib/pacer" "github.com/rclone/rclone/lib/pacer"
"github.com/rclone/rclone/lib/pool"
) )
const ( const (
@ -59,6 +60,8 @@ const (
emulatorAccount = "devstoreaccount1" emulatorAccount = "devstoreaccount1"
emulatorAccountKey = "Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==" emulatorAccountKey = "Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw=="
emulatorBlobEndpoint = "http://127.0.0.1:10000/devstoreaccount1" emulatorBlobEndpoint = "http://127.0.0.1:10000/devstoreaccount1"
memoryPoolFlushTime = fs.Duration(time.Minute) // flush the cached buffers after this long
memoryPoolUseMmap = false
) )
// Register with Fs // Register with Fs
@ -135,6 +138,18 @@ for data integrity checking but can cause long delays for large files
to start uploading.`, to start uploading.`,
Default: false, Default: false,
Advanced: true, Advanced: true,
}, {
Name: "memory_pool_flush_time",
Default: memoryPoolFlushTime,
Advanced: true,
Help: `How often internal memory buffer pools will be flushed.
Uploads which requires additional buffers (f.e multipart) will use memory pool for allocations.
This option controls how often unused buffers will be removed from the pool.`,
}, {
Name: "memory_pool_use_mmap",
Default: memoryPoolUseMmap,
Advanced: true,
Help: `Whether to use mmap buffers in internal memory pool.`,
}, { }, {
Name: config.ConfigEncoding, Name: config.ConfigEncoding,
Help: config.ConfigEncodingHelp, Help: config.ConfigEncodingHelp,
@ -151,17 +166,19 @@ to start uploading.`,
// Options defines the configuration for this backend // Options defines the configuration for this backend
type Options struct { type Options struct {
Account string `config:"account"` Account string `config:"account"`
Key string `config:"key"` Key string `config:"key"`
Endpoint string `config:"endpoint"` Endpoint string `config:"endpoint"`
SASURL string `config:"sas_url"` SASURL string `config:"sas_url"`
UploadCutoff fs.SizeSuffix `config:"upload_cutoff"` UploadCutoff fs.SizeSuffix `config:"upload_cutoff"`
ChunkSize fs.SizeSuffix `config:"chunk_size"` ChunkSize fs.SizeSuffix `config:"chunk_size"`
ListChunkSize uint `config:"list_chunk"` ListChunkSize uint `config:"list_chunk"`
AccessTier string `config:"access_tier"` AccessTier string `config:"access_tier"`
UseEmulator bool `config:"use_emulator"` UseEmulator bool `config:"use_emulator"`
DisableCheckSum bool `config:"disable_checksum"` DisableCheckSum bool `config:"disable_checksum"`
Enc encoder.MultiEncoder `config:"encoding"` MemoryPoolFlushTime fs.Duration `config:"memory_pool_flush_time"`
MemoryPoolUseMmap bool `config:"memory_pool_use_mmap"`
Enc encoder.MultiEncoder `config:"encoding"`
} }
// Fs represents a remote azure server // Fs represents a remote azure server
@ -180,6 +197,7 @@ type Fs struct {
cache *bucket.Cache // cache for container creation status cache *bucket.Cache // cache for container creation status
pacer *fs.Pacer // To pace and retry the API calls pacer *fs.Pacer // To pace and retry the API calls
uploadToken *pacer.TokenDispenser // control concurrency uploadToken *pacer.TokenDispenser // control concurrency
pool *pool.Pool // memory pool
} }
// Object describes a azure object // Object describes a azure object
@ -399,6 +417,12 @@ func NewFs(name, root string, m configmap.Mapper) (fs.Fs, error) {
client: fshttp.NewClient(fs.Config), client: fshttp.NewClient(fs.Config),
cache: bucket.NewCache(), cache: bucket.NewCache(),
cntURLcache: make(map[string]*azblob.ContainerURL, 1), cntURLcache: make(map[string]*azblob.ContainerURL, 1),
pool: pool.New(
time.Duration(opt.MemoryPoolFlushTime),
int(opt.ChunkSize),
fs.Config.Transfers,
opt.MemoryPoolUseMmap,
),
} }
f.setRoot(root) f.setRoot(root)
f.features = (&fs.Features{ f.features = (&fs.Features{
@ -995,6 +1019,19 @@ func (f *Fs) Copy(ctx context.Context, src fs.Object, remote string) (fs.Object,
return f.NewObject(ctx, remote) return f.NewObject(ctx, remote)
} }
func (f *Fs) getMemoryPool(size int64) *pool.Pool {
if size == int64(f.opt.ChunkSize) {
return f.pool
}
return pool.New(
time.Duration(f.opt.MemoryPoolFlushTime),
int(size),
fs.Config.Transfers,
f.opt.MemoryPoolUseMmap,
)
}
// ------------------------------------------------------------ // ------------------------------------------------------------
// Fs returns the parent Fs // Fs returns the parent Fs
@ -1303,6 +1340,7 @@ func (o *Object) uploadMultipart(in io.Reader, size int64, blob *azblob.BlobURL,
position := int64(0) position := int64(0)
errs := make(chan error, 1) errs := make(chan error, 1)
var wg sync.WaitGroup var wg sync.WaitGroup
memPool := o.fs.getMemoryPool(chunkSize)
outer: outer:
for part := 0; part < int(totalParts); part++ { for part := 0; part < int(totalParts); part++ {
// Check any errors // Check any errors
@ -1317,23 +1355,27 @@ outer:
reqSize = chunkSize reqSize = chunkSize
} }
// Make a block of memory // Get a block of memory from the pool and a token which limits concurrency
buf := make([]byte, reqSize) o.fs.uploadToken.Get()
buf := memPool.Get()
buf = buf[:reqSize]
// Read the chunk // Read the chunk
_, err = io.ReadFull(in, buf) _, err = io.ReadFull(in, buf)
if err != nil { if err != nil {
err = errors.Wrap(err, "multipart upload failed to read source") err = errors.Wrap(err, "multipart upload failed to read source")
memPool.Put(buf) // return the buf
o.fs.uploadToken.Put() // return the token
break outer break outer
} }
// Transfer the chunk // Transfer the chunk
nextID() nextID()
wg.Add(1) wg.Add(1)
o.fs.uploadToken.Get()
go func(part int, position int64, blockID string) { go func(part int, position int64, blockID string) {
defer wg.Done() defer wg.Done()
defer o.fs.uploadToken.Put() defer o.fs.uploadToken.Put()
defer memPool.Put(buf)
fs.Debugf(o, "Uploading part %d/%d offset %v/%v part size %v", part+1, totalParts, fs.SizeSuffix(position), fs.SizeSuffix(size), fs.SizeSuffix(chunkSize)) fs.Debugf(o, "Uploading part %d/%d offset %v/%v part size %v", part+1, totalParts, fs.SizeSuffix(position), fs.SizeSuffix(size), fs.SizeSuffix(chunkSize))
// Upload the block, with MD5 for check // Upload the block, with MD5 for check