b2: constrain memory usage when doing multipart uploads #439

Each part of a multipart upload takes 96M of memory, so we make sure
that we don't user more than `--transfers` * 96M of memory buffering
the multipart uploads.

This has the consequence that some uploads may appear to be at 0% for
a while, however they will get going eventually so this won't
re-introduce #731.
This commit is contained in:
Nick Craig-Wood 2017-01-29 22:21:39 +00:00
parent 28f9b9b611
commit 916569102c
2 changed files with 25 additions and 86 deletions

View file

@ -93,8 +93,7 @@ type Fs struct {
uploads []*api.GetUploadURLResponse // result of get upload URL calls uploads []*api.GetUploadURLResponse // result of get upload URL calls
authMu sync.Mutex // lock for authorizing the account authMu sync.Mutex // lock for authorizing the account
pacer *pacer.Pacer // To pace and retry the API calls pacer *pacer.Pacer // To pace and retry the API calls
uploadTokens chan struct{} // control concurrency of uploads bufferTokens chan []byte // control concurrency of multipart uploads
extraTokens chan struct{} // extra tokens for multipart uploads
} }
// Object describes a b2 object // Object describes a b2 object
@ -253,8 +252,7 @@ func NewFs(name, root string) (fs.Fs, error) {
endpoint: endpoint, endpoint: endpoint,
srv: rest.NewClient(fs.Config.Client()).SetErrorHandler(errorHandler), srv: rest.NewClient(fs.Config.Client()).SetErrorHandler(errorHandler),
pacer: pacer.New().SetMinSleep(minSleep).SetMaxSleep(maxSleep).SetDecayConstant(decayConstant), pacer: pacer.New().SetMinSleep(minSleep).SetMaxSleep(maxSleep).SetDecayConstant(decayConstant),
uploadTokens: make(chan struct{}, fs.Config.Transfers), bufferTokens: make(chan []byte, fs.Config.Transfers),
extraTokens: make(chan struct{}, fs.Config.Transfers),
} }
f.features = (&fs.Features{ReadMimeType: true, WriteMimeType: true}).Fill(f) f.features = (&fs.Features{ReadMimeType: true, WriteMimeType: true}).Fill(f)
// Set the test flag if required // Set the test flag if required
@ -263,10 +261,9 @@ func NewFs(name, root string) (fs.Fs, error) {
f.srv.SetHeader(testModeHeader, testMode) f.srv.SetHeader(testModeHeader, testMode)
fs.Debug(f, "Setting test header \"%s: %s\"", testModeHeader, testMode) fs.Debug(f, "Setting test header \"%s: %s\"", testModeHeader, testMode)
} }
// Fill up the upload and extra tokens // Fill up the buffer tokens
for i := 0; i < fs.Config.Transfers; i++ { for i := 0; i < fs.Config.Transfers; i++ {
f.returnUploadToken() f.bufferTokens <- nil
f.extraTokens <- struct{}{}
} }
err = f.authorizeAccount() err = f.authorizeAccount()
if err != nil { if err != nil {
@ -370,77 +367,24 @@ func (f *Fs) clearUploadURL() {
f.uploadMu.Unlock() f.uploadMu.Unlock()
} }
// Gets an upload token to control the concurrency // getUploadBlock gets a block from the pool of size chunkSize
func (f *Fs) getUploadToken() { func (f *Fs) getUploadBlock() []byte {
<-f.uploadTokens buf := <-f.bufferTokens
if buf == nil {
buf = make([]byte, chunkSize)
}
// fs.Debug(f, "Getting upload block %p", buf)
return buf
} }
// Return an upload token // putUploadBlock returns a block to the pool of size chunkSize
func (f *Fs) returnUploadToken() { func (f *Fs) putUploadBlock(buf []byte) {
f.uploadTokens <- struct{}{} buf = buf[:cap(buf)]
if len(buf) != int(chunkSize) {
panic("bad blocksize returned to pool")
} }
// fs.Debug(f, "Returning upload block %p", buf)
// Help count the multipart uploads f.bufferTokens <- buf
type multipartUploadCounter struct {
f *Fs
uploadToken chan struct{}
}
// Create a new upload counter. This gets an upload token for
// exclusive use by this multipart upload - the other tokens are
// shared between all the multipart uploads.
//
// Call .finished() when done to return the upload token.
func (f *Fs) newMultipartUploadCounter() *multipartUploadCounter {
m := &multipartUploadCounter{
f: f,
uploadToken: make(chan struct{}, 1),
}
f.getUploadToken()
m.uploadToken <- struct{}{}
return m
}
// Gets an upload token for a multipart upload
//
// This gets one token only from the first class tokens. This means
// that the multiplart upload is guaranteed at least one token as
// there is one first class token per possible upload.
//
// Pass the return value to returnMultipartUploadToken
func (m *multipartUploadCounter) getMultipartUploadToken() bool {
// get the upload token by preference
select {
case <-m.uploadToken:
return true
default:
}
// ...otherwise wait for the first one to appear.
//
// If both uploadToken and extraTokens are ready at this point
// (unlikely but possible) and we get an extraToken instead of
// an uploadToken this will not cause any harm - this
// multipart upload will get an extra upload slot temporarily.
select {
case <-m.uploadToken:
return true
case <-m.f.extraTokens:
return false
}
}
// Return a multipart upload token retreived from getMultipartUploadToken
func (m *multipartUploadCounter) returnMultipartUploadToken(firstClass bool) {
if firstClass {
m.uploadToken <- struct{}{}
} else {
m.f.extraTokens <- struct{}{}
}
}
// Mark us finished with this upload counter
func (m *multipartUploadCounter) finished() {
m.f.returnUploadToken()
} }
// Return an Object from a path // Return an Object from a path
@ -1271,10 +1215,6 @@ func (o *Object) Update(in io.Reader, src fs.ObjectInfo) (err error) {
in = fd in = fd
} }
// Get upload Token
o.fs.getUploadToken()
defer o.fs.returnUploadToken()
// Get upload URL // Get upload URL
upload, err := o.fs.getUploadURL() upload, err := o.fs.getUploadURL()
if err != nil { if err != nil {

View file

@ -240,8 +240,6 @@ func (up *largeUpload) Upload() error {
errs := make(chan error, 1) errs := make(chan error, 1)
var wg sync.WaitGroup var wg sync.WaitGroup
var err error var err error
uploadCounter := up.f.newMultipartUploadCounter()
defer uploadCounter.finished()
fs.AccountByPart(up.o) // Cancel whole file accounting before reading fs.AccountByPart(up.o) // Cancel whole file accounting before reading
outer: outer:
for part := int64(1); part <= up.parts; part++ { for part := int64(1); part <= up.parts; part++ {
@ -257,20 +255,21 @@ outer:
reqSize = int64(chunkSize) reqSize = int64(chunkSize)
} }
// Get a block of memory
buf := up.f.getUploadBlock()[:reqSize]
// Read the chunk // Read the chunk
buf := make([]byte, reqSize)
_, err = io.ReadFull(up.in, buf) _, err = io.ReadFull(up.in, buf)
if err != nil { if err != nil {
up.f.putUploadBlock(buf)
break outer break outer
} }
// Transfer the chunk // Transfer the chunk
// Get upload Token
token := uploadCounter.getMultipartUploadToken()
wg.Add(1) wg.Add(1)
go func(part int64, buf []byte, token bool) { go func(part int64, buf []byte) {
defer uploadCounter.returnMultipartUploadToken(token)
defer wg.Done() defer wg.Done()
defer up.f.putUploadBlock(buf)
err := up.transferChunk(part, buf) err := up.transferChunk(part, buf)
if err != nil { if err != nil {
select { select {
@ -278,7 +277,7 @@ outer:
default: default:
} }
} }
}(part, buf, token) }(part, buf)
remaining -= reqSize remaining -= reqSize
} }