forked from TrueCloudLab/rclone
71edc75ca6
This includes an HDFS docker image to use with the integration tests. Co-authored-by: Ivan Andreev <ivandeex@gmail.com> Co-authored-by: Nick Craig-Wood <nick@craig-wood.com>
257 lines
5.5 KiB
Go
257 lines
5.5 KiB
Go
// +build !plan9
|
|
|
|
package hdfs
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"io"
|
|
"os"
|
|
"path"
|
|
"time"
|
|
|
|
"github.com/colinmarc/hdfs/v2"
|
|
"github.com/rclone/rclone/fs"
|
|
"github.com/rclone/rclone/fs/config/configmap"
|
|
"github.com/rclone/rclone/fs/config/configstruct"
|
|
"github.com/rclone/rclone/fs/hash"
|
|
)
|
|
|
|
// Fs represents a HDFS server
|
|
type Fs struct {
|
|
name string
|
|
root string
|
|
features *fs.Features // optional features
|
|
opt Options // options for this backend
|
|
ci *fs.ConfigInfo // global config
|
|
client *hdfs.Client
|
|
}
|
|
|
|
// NewFs constructs an Fs from the path
|
|
func NewFs(ctx context.Context, name, root string, m configmap.Mapper) (fs.Fs, error) {
|
|
opt := new(Options)
|
|
err := configstruct.Set(m, opt)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
client, err := hdfs.NewClient(hdfs.ClientOptions{
|
|
Addresses: []string{opt.Namenode},
|
|
User: opt.Username,
|
|
UseDatanodeHostname: false,
|
|
})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
f := &Fs{
|
|
name: name,
|
|
root: root,
|
|
opt: *opt,
|
|
ci: fs.GetConfig(ctx),
|
|
client: client,
|
|
}
|
|
|
|
f.features = (&fs.Features{
|
|
CanHaveEmptyDirectories: true,
|
|
}).Fill(ctx, f)
|
|
|
|
info, err := f.client.Stat(f.realpath(""))
|
|
if err == nil && !info.IsDir() {
|
|
f.root = path.Dir(f.root)
|
|
return f, fs.ErrorIsFile
|
|
}
|
|
|
|
return f, nil
|
|
}
|
|
|
|
// Name of this fs
|
|
func (f *Fs) Name() string {
|
|
return f.name
|
|
}
|
|
|
|
// Root of the remote (as passed into NewFs)
|
|
func (f *Fs) Root() string {
|
|
return f.root
|
|
}
|
|
|
|
// String returns a description of the FS
|
|
func (f *Fs) String() string {
|
|
return fmt.Sprintf("hdfs://%s", f.opt.Namenode)
|
|
}
|
|
|
|
// Features returns the optional features of this Fs
|
|
func (f *Fs) Features() *fs.Features {
|
|
return f.features
|
|
}
|
|
|
|
// Precision return the precision of this Fs
|
|
func (f *Fs) Precision() time.Duration {
|
|
return time.Second
|
|
}
|
|
|
|
// Hashes are not supported
|
|
func (f *Fs) Hashes() hash.Set {
|
|
return hash.Set(hash.None)
|
|
}
|
|
|
|
// NewObject finds file at remote or return fs.ErrorObjectNotFound
|
|
func (f *Fs) NewObject(ctx context.Context, remote string) (fs.Object, error) {
|
|
realpath := f.realpath(remote)
|
|
fs.Debugf(f, "new [%s]", realpath)
|
|
|
|
info, err := f.ensureFile(realpath)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return &Object{
|
|
fs: f,
|
|
remote: remote,
|
|
size: info.Size(),
|
|
modTime: info.ModTime(),
|
|
}, nil
|
|
}
|
|
|
|
// List the objects and directories in dir into entries.
|
|
func (f *Fs) List(ctx context.Context, dir string) (entries fs.DirEntries, err error) {
|
|
realpath := f.realpath(dir)
|
|
fs.Debugf(f, "list [%s]", realpath)
|
|
|
|
err = f.ensureDirectory(realpath)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
list, err := f.client.ReadDir(realpath)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
for _, x := range list {
|
|
stdName := f.opt.Enc.ToStandardName(x.Name())
|
|
remote := path.Join(dir, stdName)
|
|
if x.IsDir() {
|
|
entries = append(entries, fs.NewDir(remote, x.ModTime()))
|
|
} else {
|
|
entries = append(entries, &Object{
|
|
fs: f,
|
|
remote: remote,
|
|
size: x.Size(),
|
|
modTime: x.ModTime()})
|
|
}
|
|
}
|
|
return entries, nil
|
|
}
|
|
|
|
// Put the object
|
|
func (f *Fs) Put(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) {
|
|
o := &Object{
|
|
fs: f,
|
|
remote: src.Remote(),
|
|
}
|
|
err := o.Update(ctx, in, src, options...)
|
|
return o, err
|
|
}
|
|
|
|
// PutStream uploads to the remote path with the modTime given of indeterminate size
|
|
func (f *Fs) PutStream(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) {
|
|
return f.Put(ctx, in, src, options...)
|
|
}
|
|
|
|
// Mkdir makes a directory
|
|
func (f *Fs) Mkdir(ctx context.Context, dir string) error {
|
|
fs.Debugf(f, "mkdir [%s]", f.realpath(dir))
|
|
return f.client.MkdirAll(f.realpath(dir), 0755)
|
|
}
|
|
|
|
// Rmdir deletes the directory
|
|
func (f *Fs) Rmdir(ctx context.Context, dir string) error {
|
|
realpath := f.realpath(dir)
|
|
fs.Debugf(f, "rmdir [%s]", realpath)
|
|
|
|
err := f.ensureDirectory(realpath)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// do not remove empty directory
|
|
list, err := f.client.ReadDir(realpath)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if len(list) > 0 {
|
|
return fs.ErrorDirectoryNotEmpty
|
|
}
|
|
|
|
return f.client.Remove(realpath)
|
|
}
|
|
|
|
// Purge deletes all the files in the directory
|
|
func (f *Fs) Purge(ctx context.Context, dir string) error {
|
|
realpath := f.realpath(dir)
|
|
fs.Debugf(f, "purge [%s]", realpath)
|
|
|
|
err := f.ensureDirectory(realpath)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
return f.client.RemoveAll(realpath)
|
|
}
|
|
|
|
// About gets quota information from the Fs
|
|
func (f *Fs) About(ctx context.Context) (*fs.Usage, error) {
|
|
info, err := f.client.StatFs()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return &fs.Usage{
|
|
Total: fs.NewUsageValue(int64(info.Capacity)),
|
|
Used: fs.NewUsageValue(int64(info.Used)),
|
|
Free: fs.NewUsageValue(int64(info.Remaining)),
|
|
}, nil
|
|
}
|
|
|
|
func (f *Fs) ensureDirectory(realpath string) error {
|
|
info, err := f.client.Stat(realpath)
|
|
|
|
if e, ok := err.(*os.PathError); ok && e.Err == os.ErrNotExist {
|
|
return fs.ErrorDirNotFound
|
|
}
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if !info.IsDir() {
|
|
return fs.ErrorDirNotFound
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (f *Fs) ensureFile(realpath string) (os.FileInfo, error) {
|
|
info, err := f.client.Stat(realpath)
|
|
|
|
if e, ok := err.(*os.PathError); ok && e.Err == os.ErrNotExist {
|
|
return nil, fs.ErrorObjectNotFound
|
|
}
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if info.IsDir() {
|
|
return nil, fs.ErrorObjectNotFound
|
|
}
|
|
|
|
return info, nil
|
|
}
|
|
|
|
func (f *Fs) realpath(dir string) string {
|
|
return f.opt.Enc.FromStandardPath(xPath(f.Root(), dir))
|
|
}
|
|
|
|
// Check the interfaces are satisfied
|
|
var (
|
|
_ fs.Fs = (*Fs)(nil)
|
|
_ fs.Purger = (*Fs)(nil)
|
|
_ fs.PutStreamer = (*Fs)(nil)
|
|
_ fs.Abouter = (*Fs)(nil)
|
|
)
|