azureblob: fix memory usage by upgrading the SDK and implementing a TransferManager

In the Azure SDK there was a bug which caused excessive memory use
when doing repeated transfers:

https://github.com/Azure/azure-storage-blob-go/issues/233

This patch updates the SDK to v0.13.0 which allowed us to implement a
custom TransferManager which integrates with rclone's memory pool.

This fixes the excessive memory consumption.

See: https://forum.rclone.org/t/ask-for-settings-recommendation-for-azureblob/21505/
This commit is contained in:
Nick Craig-Wood 2021-01-13 10:49:57 +00:00
parent 2f67681e3b
commit 189ef5f257
3 changed files with 48 additions and 2 deletions

View file

@ -49,6 +49,7 @@ const (
storageDefaultBaseURL = "blob.core.windows.net"
defaultChunkSize = 4 * fs.MebiByte
maxChunkSize = 100 * fs.MebiByte
uploadConcurrency = 4
defaultAccessTier = azblob.AccessTierNone
maxTryTimeout = time.Hour * 24 * 365 //max time of an azure web request response window (whether or not data is flowing)
// Default storage account, key and blob endpoint for emulator support,
@ -1492,6 +1493,48 @@ func increment(xs []byte) {
}
}
// poolWrapper wraps a pool.Pool as an azblob.TransferManager
type poolWrapper struct {
pool *pool.Pool
bufToken chan struct{}
runToken chan struct{}
}
// newPoolWrapper creates an azblob.TransferManager that will use a
// pool.Pool with maximum concurrency as specified.
func (f *Fs) newPoolWrapper(concurrency int) azblob.TransferManager {
return &poolWrapper{
pool: f.pool,
bufToken: make(chan struct{}, concurrency),
runToken: make(chan struct{}, concurrency),
}
}
// Get implements TransferManager.Get().
func (pw *poolWrapper) Get() []byte {
pw.bufToken <- struct{}{}
return pw.pool.Get()
}
// Put implements TransferManager.Put().
func (pw *poolWrapper) Put(b []byte) {
pw.pool.Put(b)
<-pw.bufToken
}
// Run implements TransferManager.Run().
func (pw *poolWrapper) Run(f func()) {
pw.runToken <- struct{}{}
go func() {
f()
<-pw.runToken
}()
}
// Close implements TransferManager.Close().
func (pw *poolWrapper) Close() {
}
// Update the object with the contents of the io.Reader, modTime and size
//
// The new object may have been created if an error is returned
@ -1538,9 +1581,10 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op
putBlobOptions := azblob.UploadStreamToBlockBlobOptions{
BufferSize: int(o.fs.opt.ChunkSize),
MaxBuffers: 4,
MaxBuffers: uploadConcurrency,
Metadata: o.meta,
BlobHTTPHeaders: httpHeaders,
TransferManager: o.fs.newPoolWrapper(uploadConcurrency),
}
// Don't retry, return a retry error instead