From 5d911e94508e5d0ebb0702d28a1ab128da95706f Mon Sep 17 00:00:00 2001 From: Nick Craig-Wood Date: Sat, 29 Jul 2017 22:05:36 +0100 Subject: [PATCH] pacer: Factor TokenDispenser into pacer from box remote --- box/box.go | 41 +++++++++++++---------------------------- box/upload.go | 4 ++-- pacer/tokens.go | 31 +++++++++++++++++++++++++++++++ pacer/tokens_test.go | 16 ++++++++++++++++ 4 files changed, 62 insertions(+), 30 deletions(-) create mode 100644 pacer/tokens.go create mode 100644 pacer/tokens_test.go diff --git a/box/box.go b/box/box.go index a963fcdf0..6af16ce43 100644 --- a/box/box.go +++ b/box/box.go @@ -85,14 +85,14 @@ func init() { // Fs represents a remote box type Fs struct { - name string // name of this remote - root string // the path we are working on - features *fs.Features // optional features - srv *rest.Client // the connection to the one drive server - dirCache *dircache.DirCache // Map of directory path to directory id - pacer *pacer.Pacer // pacer for API calls - tokenRenewer *oauthutil.Renew // renew the token on expiry - uploadTokens chan struct{} // control concurrency of multipart uploads + name string // name of this remote + root string // the path we are working on + features *fs.Features // optional features + srv *rest.Client // the connection to the one drive server + dirCache *dircache.DirCache // Map of directory path to directory id + pacer *pacer.Pacer // pacer for API calls + tokenRenewer *oauthutil.Renew // renew the token on expiry + uploadToken *pacer.TokenDispenser // control concurrency } // Object describes a box object @@ -238,19 +238,15 @@ func NewFs(name, root string) (fs.Fs, error) { } f := &Fs{ - name: name, - root: root, - srv: rest.NewClient(oAuthClient).SetRoot(rootURL), - pacer: pacer.New().SetMinSleep(minSleep).SetMaxSleep(maxSleep).SetDecayConstant(decayConstant), - uploadTokens: make(chan struct{}, fs.Config.Transfers), + name: name, + root: root, + srv: rest.NewClient(oAuthClient).SetRoot(rootURL), + pacer: pacer.New().SetMinSleep(minSleep).SetMaxSleep(maxSleep).SetDecayConstant(decayConstant), + uploadToken: pacer.NewTokenDispenser(fs.Config.Transfers), } f.features = (&fs.Features{CaseInsensitive: true}).Fill(f) f.srv.SetErrorHandler(errorHandler) - // Fill up the upload tokens - for i := 0; i < fs.Config.Transfers; i++ { - f.uploadTokens <- struct{}{} - } // Renew the token in the background f.tokenRenewer = oauthutil.NewRenew(f.String(), ts, func() error { _, err := f.readMetaDataForPath("") @@ -296,17 +292,6 @@ func (f *Fs) rootSlash() string { return f.root + "/" } -// getUploadToken gets a token from the upload pool. -func (f *Fs) getUploadToken() { - <-f.uploadTokens - return -} - -// putUploadToken returns a token to the pool -func (f *Fs) putUploadToken() { - f.uploadTokens <- struct{}{} -} - // Return an Object from a path // // If it can't be found it returns the error fs.ErrorObjectNotFound. diff --git a/box/upload.go b/box/upload.go index 4812fbbad..0ee7f12c6 100644 --- a/box/upload.go +++ b/box/upload.go @@ -214,10 +214,10 @@ outer: // Transfer the chunk wg.Add(1) + o.fs.uploadToken.Get() go func(part int, position int64) { defer wg.Done() - o.fs.getUploadToken() - defer o.fs.putUploadToken() + defer o.fs.uploadToken.Put() fs.Debugf(o, "Uploading part %d/%d offset %v/%v part size %v", part+1, session.TotalParts, fs.SizeSuffix(position), fs.SizeSuffix(size), fs.SizeSuffix(chunkSize)) partResponse, err := o.uploadPart(session.ID, position, size, buf) if err != nil { diff --git a/pacer/tokens.go b/pacer/tokens.go new file mode 100644 index 000000000..b4f905ba9 --- /dev/null +++ b/pacer/tokens.go @@ -0,0 +1,31 @@ +// Tokens for controlling concurrency + +package pacer + +// TokenDispenser is for controlling concurrency +type TokenDispenser struct { + tokens chan struct{} +} + +// NewTokenDispenser makes a pool of n tokens +func NewTokenDispenser(n int) *TokenDispenser { + td := &TokenDispenser{ + tokens: make(chan struct{}, n), + } + // Fill up the upload tokens + for i := 0; i < n; i++ { + td.tokens <- struct{}{} + } + return td +} + +// Get gets a token from the pool - don't forget to return it with Put +func (td *TokenDispenser) Get() { + <-td.tokens + return +} + +// Put returns a token +func (td *TokenDispenser) Put() { + td.tokens <- struct{}{} +} diff --git a/pacer/tokens_test.go b/pacer/tokens_test.go new file mode 100644 index 000000000..715c5c072 --- /dev/null +++ b/pacer/tokens_test.go @@ -0,0 +1,16 @@ +package pacer + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestTokenDispenser(t *testing.T) { + td := NewTokenDispenser(5) + assert.Equal(t, 5, len(td.tokens)) + td.Get() + assert.Equal(t, 4, len(td.tokens)) + td.Put() + assert.Equal(t, 5, len(td.tokens)) +}