diff --git a/ftp/ftp.go b/ftp/ftp.go index fae675b4d..b856aeacc 100644 --- a/ftp/ftp.go +++ b/ftp/ftp.go @@ -1,12 +1,11 @@ // Package ftp interfaces with FTP servers package ftp -// FIXME Mover and DirMover are possible using f.c.Rename -// FIXME Should have a pool of connections rather than a global lock +// FIXME Mover and DirMover are possible using c.Rename +// FIXME support conntimeout import ( "io" - "io/ioutil" "net/textproto" "net/url" "path" @@ -19,11 +18,6 @@ import ( "github.com/pkg/errors" ) -// This mutex is only used by ftpConnection. We create a new ftp -// connection for each transfer, but we need to serialize it otherwise -// Dial() and Login() might be mixed... -var globalMux = sync.Mutex{} - // Register with Fs func init() { fs.Register(&fs.RegInfo{ @@ -48,15 +42,15 @@ func init() { // Fs represents a remote FTP server type Fs struct { - name string // name of this remote - root string // the path we are working on if any - features *fs.Features // optional features - c *ftp.ServerConn // the connection to the FTP server + name string // name of this remote + root string // the path we are working on if any + features *fs.Features // optional features url *url.URL - mu sync.Mutex user string pass string dialAddr string + poolMu sync.Mutex + pool []*ftp.ServerConn } // Object describes an FTP file @@ -98,22 +92,44 @@ func (f *Fs) Features() *fs.Features { // Open a new connection to the FTP server. func (f *Fs) ftpConnection() (*ftp.ServerConn, error) { - globalMux.Lock() - defer globalMux.Unlock() fs.Debugf(f, "Connecting to FTP server") c, err := ftp.DialTimeout(f.dialAddr, 30*time.Second) if err != nil { - fs.Errorf(nil, "Error while Dialing %s: %s", f.dialAddr, err) - return nil, err + fs.Errorf(f, "Error while Dialing %s: %s", f.dialAddr, err) + return nil, errors.Wrap(err, "ftpConnection Dial") } err = c.Login(f.user, f.pass) if err != nil { - fs.Errorf(nil, "Error while Logging in into %s: %s", f.dialAddr, err) - return nil, err + _ = c.Quit() + fs.Errorf(f, "Error while Logging in into %s: %s", f.dialAddr, err) + return nil, errors.Wrap(err, "ftpConnection Login") } return c, nil } +// Get an FTP connection from the pool, or open a new one +func (f *Fs) getFtpConnection() (c *ftp.ServerConn, err error) { + f.poolMu.Lock() + if len(f.pool) > 0 { + c = f.pool[0] + f.pool = f.pool[1:] + } + f.poolMu.Unlock() + if c != nil { + return c, nil + } + return f.ftpConnection() +} + +// Return an FTP connection to the pool +// +// It nils the pointed to connection out so it can't be reused +func (f *Fs) putFtpConnection(c **ftp.ServerConn) { + f.poolMu.Lock() + f.pool = append(f.pool, *c) + f.poolMu.Unlock() +} + // NewFs contstructs an Fs from the path, container:path func NewFs(name, root string) (ff fs.Fs, err error) { // defer fs.Trace(nil, "name=%q, root=%q", name, root)("fs=%v, err=%v", &ff, &err) @@ -147,10 +163,12 @@ func NewFs(name, root string) (ff fs.Fs, err error) { dialAddr: dialAddr, } f.features = (&fs.Features{}).Fill(f) - f.c, err = f.ftpConnection() + // Make a connection and pool it to return errors early + c, err := f.getFtpConnection() if err != nil { - return nil, err + return nil, errors.Wrap(err, "NewFs") } + f.putFtpConnection(&c) if root != "" { // Check to see if the root actually an existing file remote := path.Base(root) @@ -205,9 +223,12 @@ func (f *Fs) NewObject(remote string) (o fs.Object, err error) { dir := path.Dir(fullPath) base := path.Base(fullPath) - f.mu.Lock() - files, err := f.c.List(dir) - f.mu.Unlock() + c, err := f.getFtpConnection() + if err != nil { + return nil, errors.Wrap(err, "NewObject") + } + files, err := c.List(dir) + f.putFtpConnection(&c) if err != nil { return nil, translateErrorFile(err) } @@ -232,9 +253,13 @@ func (f *Fs) NewObject(remote string) (o fs.Object, err error) { func (f *Fs) list(out fs.ListOpts, dir string, curlevel int) { // defer fs.Trace(dir, "curlevel=%d", curlevel)("") - f.mu.Lock() - files, err := f.c.List(path.Join(f.root, dir)) - f.mu.Unlock() + c, err := f.getFtpConnection() + if err != nil { + out.SetError(errors.Wrap(err, "list")) + return + } + files, err := c.List(path.Join(f.root, dir)) + f.putFtpConnection(&c) if err != nil { out.SetError(translateErrorDir(err)) return @@ -326,9 +351,12 @@ func (f *Fs) getInfo(remote string) (fi *FileInfo, err error) { dir := path.Dir(remote) base := path.Base(remote) - f.mu.Lock() - files, err := f.c.List(dir) - f.mu.Unlock() + c, err := f.getFtpConnection() + if err != nil { + return nil, errors.Wrap(err, "getInfo") + } + files, err := c.List(dir) + f.putFtpConnection(&c) if err != nil { return nil, translateErrorFile(err) } @@ -351,9 +379,12 @@ func (f *Fs) mkdir(abspath string) error { _, err := f.getInfo(abspath) if err == fs.ErrorObjectNotFound { // fs.Debugf(f, "Trying to create directory %s", abspath) - f.mu.Lock() - err = f.c.MakeDir(abspath) - f.mu.Unlock() + c, connErr := f.getFtpConnection() + if connErr != nil { + return errors.Wrap(connErr, "mkdir") + } + err = c.MakeDir(abspath) + f.putFtpConnection(&c) } return err } @@ -384,9 +415,12 @@ func (f *Fs) Mkdir(dir string) (err error) { // Return an error if it doesn't exist or isn't empty func (f *Fs) Rmdir(dir string) error { // This is actually a recursive remove directory - f.mu.Lock() - files, err := f.c.List(path.Join(f.root, dir)) - f.mu.Unlock() + c, err := f.getFtpConnection() + if err != nil { + return errors.Wrap(err, "Rmdir") + } + files, err := c.List(path.Join(f.root, dir)) + f.putFtpConnection(&c) if err != nil { return translateErrorDir(err) } @@ -398,10 +432,13 @@ func (f *Fs) Rmdir(dir string) error { } } } - f.mu.Lock() - err = f.c.RemoveDir(path.Join(f.root, dir)) - f.mu.Unlock() - return err + c, err = f.getFtpConnection() + if err != nil { + return errors.Wrap(err, "Rmdir") + } + err = c.RemoveDir(path.Join(f.root, dir)) + f.putFtpConnection(&c) + return translateErrorDir(err) } // ------------------------------------------------------------ @@ -453,15 +490,13 @@ func (o *Object) Storable() bool { type ftpReadCloser struct { io.ReadCloser c *ftp.ServerConn + f *Fs } -// Close the FTP reader +// Close the FTP reader and return the connection to the pool func (f *ftpReadCloser) Close() error { err := f.ReadCloser.Close() - err2 := f.c.Quit() - if err == nil { - err = err2 - } + f.f.putFtpConnection(&f.c) return err } @@ -480,22 +515,16 @@ func (o *Object) Open(options ...fs.OpenOption) (rc io.ReadCloser, err error) { } } } - c, err := o.fs.ftpConnection() + c, err := o.fs.getFtpConnection() if err != nil { return nil, errors.Wrap(err, "open") } - fd, err := c.Retr(path) + fd, err := c.RetrFrom(path, uint64(offset)) if err != nil { + o.fs.putFtpConnection(&c) return nil, errors.Wrap(err, "open") } - rc = &ftpReadCloser{ReadCloser: fd, c: c} - if offset != 0 { - _, err = io.CopyN(ioutil.Discard, fd, offset) - if err != nil { - _ = rc.Close() - return nil, errors.Wrap(err, "open skipping bytes") - } - } + rc = &ftpReadCloser{ReadCloser: fd, c: c, f: o.fs} return rc, nil } @@ -519,17 +548,14 @@ func (o *Object) makeAllDir() error { // Copy the reader into the object updating modTime and size // // The new object may have been created if an error is returned -func (o *Object) Update(in io.Reader, src fs.ObjectInfo) error { +func (o *Object) Update(in io.Reader, src fs.ObjectInfo) (err error) { + // defer fs.Trace(o, "src=%v", src)("err=%v", &err) // Create all upper directory first... - err := o.makeAllDir() + err = o.makeAllDir() if err != nil { return errors.Wrap(err, "update mkdir") } path := path.Join(o.fs.root, o.remote) - c, err := o.fs.ftpConnection() - if err != nil { - return errors.Wrap(err, "update connect") - } // remove the file if upload failed remove := func() { removeErr := o.Remove() @@ -539,11 +565,17 @@ func (o *Object) Update(in io.Reader, src fs.ObjectInfo) error { fs.Debugf(o, "Removed after failed upload: %v", err) } } + c, err := o.fs.getFtpConnection() + if err != nil { + return errors.Wrap(err, "Update") + } err = c.Stor(path, in) if err != nil { + _ = c.Quit() remove() return errors.Wrap(err, "update stor") } + o.fs.putFtpConnection(&c) o.info, err = o.fs.getInfo(path) if err != nil { return errors.Wrap(err, "update getinfo") @@ -563,9 +595,12 @@ func (o *Object) Remove() (err error) { if info.IsDir { err = o.fs.Rmdir(o.remote) } else { - o.fs.mu.Lock() - err = o.fs.c.Delete(path) - o.fs.mu.Unlock() + c, err := o.fs.getFtpConnection() + if err != nil { + return errors.Wrap(err, "Remove") + } + err = c.Delete(path) + o.fs.putFtpConnection(&c) } return err }