diff --git a/sftp/sftp.go b/sftp/sftp.go index 7c672d9b2..8ac1ca8b5 100644 --- a/sftp/sftp.go +++ b/sftp/sftp.go @@ -60,6 +60,7 @@ type Fs struct { url string sshClient *ssh.Client sftpClient *sftp.Client + mkdirLock *stringLock } // Object is a remote SFTP file that has been stat'd (so it exists, but is not necessarily open for reading) @@ -138,6 +139,7 @@ func NewFs(name, root string) (fs.Fs, error) { sshClient: sshClient, sftpClient: sftpClient, url: "sftp://" + user + "@" + host + ":" + port + "/" + root, + mkdirLock: newStringLock(), } f.features = (&fs.Features{}).Fill(f) if root != "" { @@ -321,6 +323,8 @@ func (f *Fs) mkParentDir(remote string) error { // mkdir makes the directory and parents using native paths func (f *Fs) mkdir(path string) error { + f.mkdirLock.Lock(path) + defer f.mkdirLock.Unlock(path) if path == "." || path == "/" { return nil } diff --git a/sftp/stringlock.go b/sftp/stringlock.go new file mode 100644 index 000000000..d0205422b --- /dev/null +++ b/sftp/stringlock.go @@ -0,0 +1,51 @@ +package sftp + +import ( + "sync" + + "github.com/ncw/rclone/fs" +) + +// stringLock locks for string IDs passed in +type stringLock struct { + mu sync.Mutex // mutex to protect below + locks map[string]chan struct{} // map of locks +} + +// newStringLock creates a stringLock +func newStringLock() *stringLock { + return &stringLock{ + locks: make(map[string]chan struct{}), + } +} + +// Lock locks on the id passed in +func (l *stringLock) Lock(ID string) { + l.mu.Lock() + for { + ch, ok := l.locks[ID] + if !ok { + break + } + // Wait for the channel to be closed + l.mu.Unlock() + fs.Log(nil, "Waiting for stringLock on %q", ID) + <-ch + l.mu.Lock() + } + l.locks[ID] = make(chan struct{}) + l.mu.Unlock() +} + +// Unlock unlocks on the id passed in. Will panic if Lock with the +// given id wasn't called first. +func (l *stringLock) Unlock(ID string) { + l.mu.Lock() + ch, ok := l.locks[ID] + if !ok { + panic("stringLock: Unlock before Lock") + } + close(ch) + delete(l.locks, ID) + l.mu.Unlock() +} diff --git a/sftp/stringlock_test.go b/sftp/stringlock_test.go new file mode 100644 index 000000000..57179915e --- /dev/null +++ b/sftp/stringlock_test.go @@ -0,0 +1,40 @@ +package sftp + +import ( + "fmt" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +func TestStringLock(t *testing.T) { + var wg sync.WaitGroup + counter := [3]int{} + lock := newStringLock() + const ( + outer = 10 + inner = 100 + total = outer * inner + ) + for k := 0; k < outer; k++ { + for j := range counter { + wg.Add(1) + go func(j int) { + defer wg.Done() + ID := fmt.Sprintf("%d", j) + for i := 0; i < inner; i++ { + lock.Lock(ID) + n := counter[j] + time.Sleep(1 * time.Millisecond) + counter[j] = n + 1 + lock.Unlock(ID) + } + + }(j) + } + } + wg.Wait() + assert.Equal(t, [3]int{total, total, total}, counter) +}