sftp: Fix remote race on creating directories
Because there is a period of time between checking a directory needs creating and creating it, the leads to errors where directories are attempting to be created twice. Add locking on a per directory basis to fix while doing mkdir.
This commit is contained in:
parent
726cb43be9
commit
16d91246c4
3 changed files with 95 additions and 0 deletions
|
@ -60,6 +60,7 @@ type Fs struct {
|
|||
url string
|
||||
sshClient *ssh.Client
|
||||
sftpClient *sftp.Client
|
||||
mkdirLock *stringLock
|
||||
}
|
||||
|
||||
// Object is a remote SFTP file that has been stat'd (so it exists, but is not necessarily open for reading)
|
||||
|
@ -138,6 +139,7 @@ func NewFs(name, root string) (fs.Fs, error) {
|
|||
sshClient: sshClient,
|
||||
sftpClient: sftpClient,
|
||||
url: "sftp://" + user + "@" + host + ":" + port + "/" + root,
|
||||
mkdirLock: newStringLock(),
|
||||
}
|
||||
f.features = (&fs.Features{}).Fill(f)
|
||||
if root != "" {
|
||||
|
@ -321,6 +323,8 @@ func (f *Fs) mkParentDir(remote string) error {
|
|||
|
||||
// mkdir makes the directory and parents using native paths
|
||||
func (f *Fs) mkdir(path string) error {
|
||||
f.mkdirLock.Lock(path)
|
||||
defer f.mkdirLock.Unlock(path)
|
||||
if path == "." || path == "/" {
|
||||
return nil
|
||||
}
|
||||
|
|
51
sftp/stringlock.go
Normal file
51
sftp/stringlock.go
Normal file
|
@ -0,0 +1,51 @@
|
|||
package sftp
|
||||
|
||||
import (
|
||||
"sync"
|
||||
|
||||
"github.com/ncw/rclone/fs"
|
||||
)
|
||||
|
||||
// stringLock locks for string IDs passed in
|
||||
type stringLock struct {
|
||||
mu sync.Mutex // mutex to protect below
|
||||
locks map[string]chan struct{} // map of locks
|
||||
}
|
||||
|
||||
// newStringLock creates a stringLock
|
||||
func newStringLock() *stringLock {
|
||||
return &stringLock{
|
||||
locks: make(map[string]chan struct{}),
|
||||
}
|
||||
}
|
||||
|
||||
// Lock locks on the id passed in
|
||||
func (l *stringLock) Lock(ID string) {
|
||||
l.mu.Lock()
|
||||
for {
|
||||
ch, ok := l.locks[ID]
|
||||
if !ok {
|
||||
break
|
||||
}
|
||||
// Wait for the channel to be closed
|
||||
l.mu.Unlock()
|
||||
fs.Log(nil, "Waiting for stringLock on %q", ID)
|
||||
<-ch
|
||||
l.mu.Lock()
|
||||
}
|
||||
l.locks[ID] = make(chan struct{})
|
||||
l.mu.Unlock()
|
||||
}
|
||||
|
||||
// Unlock unlocks on the id passed in. Will panic if Lock with the
|
||||
// given id wasn't called first.
|
||||
func (l *stringLock) Unlock(ID string) {
|
||||
l.mu.Lock()
|
||||
ch, ok := l.locks[ID]
|
||||
if !ok {
|
||||
panic("stringLock: Unlock before Lock")
|
||||
}
|
||||
close(ch)
|
||||
delete(l.locks, ID)
|
||||
l.mu.Unlock()
|
||||
}
|
40
sftp/stringlock_test.go
Normal file
40
sftp/stringlock_test.go
Normal file
|
@ -0,0 +1,40 @@
|
|||
package sftp
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
func TestStringLock(t *testing.T) {
|
||||
var wg sync.WaitGroup
|
||||
counter := [3]int{}
|
||||
lock := newStringLock()
|
||||
const (
|
||||
outer = 10
|
||||
inner = 100
|
||||
total = outer * inner
|
||||
)
|
||||
for k := 0; k < outer; k++ {
|
||||
for j := range counter {
|
||||
wg.Add(1)
|
||||
go func(j int) {
|
||||
defer wg.Done()
|
||||
ID := fmt.Sprintf("%d", j)
|
||||
for i := 0; i < inner; i++ {
|
||||
lock.Lock(ID)
|
||||
n := counter[j]
|
||||
time.Sleep(1 * time.Millisecond)
|
||||
counter[j] = n + 1
|
||||
lock.Unlock(ID)
|
||||
}
|
||||
|
||||
}(j)
|
||||
}
|
||||
}
|
||||
wg.Wait()
|
||||
assert.Equal(t, [3]int{total, total, total}, counter)
|
||||
}
|
Loading…
Reference in a new issue