//go:build !plan9
// +build !plan9

package hdfs

import (
	"context"
	"fmt"
	"io"
	"os"
	"os/user"
	"path"
	"strings"
	"time"

	"github.com/colinmarc/hdfs/v2"
	krb "github.com/jcmturner/gokrb5/v8/client"
	"github.com/jcmturner/gokrb5/v8/config"
	"github.com/jcmturner/gokrb5/v8/credentials"
	"github.com/rclone/rclone/fs"
	"github.com/rclone/rclone/fs/config/configmap"
	"github.com/rclone/rclone/fs/config/configstruct"
	"github.com/rclone/rclone/fs/hash"
)

// Fs represents a HDFS server
type Fs struct {
	name     string
	root     string
	features *fs.Features   // optional features
	opt      Options        // options for this backend
	ci       *fs.ConfigInfo // global config
	client   *hdfs.Client
}

// copy-paste from https://github.com/colinmarc/hdfs/blob/master/cmd/hdfs/kerberos.go
func getKerberosClient() (*krb.Client, error) {
	configPath := os.Getenv("KRB5_CONFIG")
	if configPath == "" {
		configPath = "/etc/krb5.conf"
	}

	cfg, err := config.Load(configPath)
	if err != nil {
		return nil, err
	}

	// Determine the ccache location from the environment, falling back to the
	// default location.
	ccachePath := os.Getenv("KRB5CCNAME")
	if strings.Contains(ccachePath, ":") {
		if strings.HasPrefix(ccachePath, "FILE:") {
			ccachePath = strings.SplitN(ccachePath, ":", 2)[1]
		} else {
			return nil, fmt.Errorf("unusable ccache: %s", ccachePath)
		}
	} else if ccachePath == "" {
		u, err := user.Current()
		if err != nil {
			return nil, err
		}

		ccachePath = fmt.Sprintf("/tmp/krb5cc_%s", u.Uid)
	}

	ccache, err := credentials.LoadCCache(ccachePath)
	if err != nil {
		return nil, err
	}

	client, err := krb.NewFromCCache(ccache, cfg)
	if err != nil {
		return nil, err
	}

	return client, nil
}

// NewFs constructs an Fs from the path
func NewFs(ctx context.Context, name, root string, m configmap.Mapper) (fs.Fs, error) {
	opt := new(Options)
	err := configstruct.Set(m, opt)
	if err != nil {
		return nil, err
	}

	options := hdfs.ClientOptions{
		Addresses:           []string{opt.Namenode},
		UseDatanodeHostname: false,
	}

	if opt.ServicePrincipalName != "" {
		options.KerberosClient, err = getKerberosClient()
		if err != nil {
			return nil, fmt.Errorf("problem with kerberos authentication: %w", err)
		}
		options.KerberosServicePrincipleName = opt.ServicePrincipalName

		if opt.DataTransferProtection != "" {
			options.DataTransferProtection = opt.DataTransferProtection
		}
	} else {
		options.User = opt.Username
	}

	client, err := hdfs.NewClient(options)
	if err != nil {
		return nil, err
	}

	f := &Fs{
		name:   name,
		root:   root,
		opt:    *opt,
		ci:     fs.GetConfig(ctx),
		client: client,
	}

	f.features = (&fs.Features{
		CanHaveEmptyDirectories: true,
	}).Fill(ctx, f)

	info, err := f.client.Stat(f.realpath(""))
	if err == nil && !info.IsDir() {
		f.root = path.Dir(f.root)
		return f, fs.ErrorIsFile
	}

	return f, nil
}

// Name of this fs
func (f *Fs) Name() string {
	return f.name
}

// Root of the remote (as passed into NewFs)
func (f *Fs) Root() string {
	return f.root
}

// String returns a description of the FS
func (f *Fs) String() string {
	return fmt.Sprintf("hdfs://%s", f.opt.Namenode)
}

// Features returns the optional features of this Fs
func (f *Fs) Features() *fs.Features {
	return f.features
}

// Precision return the precision of this Fs
func (f *Fs) Precision() time.Duration {
	return time.Second
}

// Hashes are not supported
func (f *Fs) Hashes() hash.Set {
	return hash.Set(hash.None)
}

// NewObject finds file at remote or return fs.ErrorObjectNotFound
func (f *Fs) NewObject(ctx context.Context, remote string) (fs.Object, error) {
	realpath := f.realpath(remote)
	fs.Debugf(f, "new [%s]", realpath)

	info, err := f.ensureFile(realpath)
	if err != nil {
		return nil, err
	}

	return &Object{
		fs:      f,
		remote:  remote,
		size:    info.Size(),
		modTime: info.ModTime(),
	}, nil
}

// List the objects and directories in dir into entries.
func (f *Fs) List(ctx context.Context, dir string) (entries fs.DirEntries, err error) {
	realpath := f.realpath(dir)
	fs.Debugf(f, "list [%s]", realpath)

	err = f.ensureDirectory(realpath)
	if err != nil {
		return nil, err
	}

	list, err := f.client.ReadDir(realpath)
	if err != nil {
		return nil, err
	}
	for _, x := range list {
		stdName := f.opt.Enc.ToStandardName(x.Name())
		remote := path.Join(dir, stdName)
		if x.IsDir() {
			entries = append(entries, fs.NewDir(remote, x.ModTime()))
		} else {
			entries = append(entries, &Object{
				fs:      f,
				remote:  remote,
				size:    x.Size(),
				modTime: x.ModTime()})
		}
	}
	return entries, nil
}

// Put the object
func (f *Fs) Put(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) {
	o := &Object{
		fs:     f,
		remote: src.Remote(),
	}
	err := o.Update(ctx, in, src, options...)
	return o, err
}

// PutStream uploads to the remote path with the modTime given of indeterminate size
func (f *Fs) PutStream(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) {
	return f.Put(ctx, in, src, options...)
}

// Mkdir makes a directory
func (f *Fs) Mkdir(ctx context.Context, dir string) error {
	fs.Debugf(f, "mkdir [%s]", f.realpath(dir))
	return f.client.MkdirAll(f.realpath(dir), 0755)
}

// Rmdir deletes the directory
func (f *Fs) Rmdir(ctx context.Context, dir string) error {
	realpath := f.realpath(dir)
	fs.Debugf(f, "rmdir [%s]", realpath)

	err := f.ensureDirectory(realpath)
	if err != nil {
		return err
	}

	// do not remove empty directory
	list, err := f.client.ReadDir(realpath)
	if err != nil {
		return err
	}
	if len(list) > 0 {
		return fs.ErrorDirectoryNotEmpty
	}

	return f.client.Remove(realpath)
}

// Purge deletes all the files in the directory
func (f *Fs) Purge(ctx context.Context, dir string) error {
	realpath := f.realpath(dir)
	fs.Debugf(f, "purge [%s]", realpath)

	err := f.ensureDirectory(realpath)
	if err != nil {
		return err
	}

	return f.client.RemoveAll(realpath)
}

// Move src to this remote using server-side move operations.
//
// This is stored with the remote path given.
//
// It returns the destination Object and a possible error.
//
// Will only be called if src.Fs().Name() == f.Name()
//
// If it isn't possible then return fs.ErrorCantMove
func (f *Fs) Move(ctx context.Context, src fs.Object, remote string) (fs.Object, error) {
	srcObj, ok := src.(*Object)
	if !ok {
		fs.Debugf(src, "Can't move - not same remote type")
		return nil, fs.ErrorCantMove
	}

	// Get the real paths from the remote specs:
	sourcePath := srcObj.fs.realpath(srcObj.remote)
	targetPath := f.realpath(remote)
	fs.Debugf(f, "rename [%s] to [%s]", sourcePath, targetPath)

	// Make sure the target folder exists:
	dirname := path.Dir(targetPath)
	err := f.client.MkdirAll(dirname, 0755)
	if err != nil {
		return nil, err
	}

	// Do the move
	// Note that the underlying HDFS library hard-codes Overwrite=True, but this is expected rclone behaviour.
	err = f.client.Rename(sourcePath, targetPath)
	if err != nil {
		return nil, err
	}

	// Look up the resulting object
	info, err := f.client.Stat(targetPath)
	if err != nil {
		return nil, err
	}

	// And return it:
	return &Object{
		fs:      f,
		remote:  remote,
		size:    info.Size(),
		modTime: info.ModTime(),
	}, nil
}

// DirMove moves src, srcRemote to this remote at dstRemote
// using server-side move operations.
//
// Will only be called if src.Fs().Name() == f.Name()
//
// If it isn't possible then return fs.ErrorCantDirMove
//
// If destination exists then return fs.ErrorDirExists
func (f *Fs) DirMove(ctx context.Context, src fs.Fs, srcRemote, dstRemote string) (err error) {
	srcFs, ok := src.(*Fs)
	if !ok {
		return fs.ErrorCantDirMove
	}

	// Get the real paths from the remote specs:
	sourcePath := srcFs.realpath(srcRemote)
	targetPath := f.realpath(dstRemote)
	fs.Debugf(f, "rename [%s] to [%s]", sourcePath, targetPath)

	// Check if the destination exists:
	info, err := f.client.Stat(targetPath)
	if err == nil {
		fs.Debugf(f, "target directory already exits, IsDir = [%t]", info.IsDir())
		return fs.ErrorDirExists
	}

	// Make sure the targets parent folder exists:
	dirname := path.Dir(targetPath)
	err = f.client.MkdirAll(dirname, 0755)
	if err != nil {
		return err
	}

	// Do the move
	err = f.client.Rename(sourcePath, targetPath)
	if err != nil {
		return err
	}

	return nil
}

// About gets quota information from the Fs
func (f *Fs) About(ctx context.Context) (*fs.Usage, error) {
	info, err := f.client.StatFs()
	if err != nil {
		return nil, err
	}
	return &fs.Usage{
		Total: fs.NewUsageValue(int64(info.Capacity)),
		Used:  fs.NewUsageValue(int64(info.Used)),
		Free:  fs.NewUsageValue(int64(info.Remaining)),
	}, nil
}

func (f *Fs) ensureDirectory(realpath string) error {
	info, err := f.client.Stat(realpath)

	if e, ok := err.(*os.PathError); ok && e.Err == os.ErrNotExist {
		return fs.ErrorDirNotFound
	}
	if err != nil {
		return err
	}
	if !info.IsDir() {
		return fs.ErrorDirNotFound
	}

	return nil
}

func (f *Fs) ensureFile(realpath string) (os.FileInfo, error) {
	info, err := f.client.Stat(realpath)

	if e, ok := err.(*os.PathError); ok && e.Err == os.ErrNotExist {
		return nil, fs.ErrorObjectNotFound
	}
	if err != nil {
		return nil, err
	}
	if info.IsDir() {
		return nil, fs.ErrorObjectNotFound
	}

	return info, nil
}

func (f *Fs) realpath(dir string) string {
	return f.opt.Enc.FromStandardPath(xPath(f.Root(), dir))
}

// Check the interfaces are satisfied
var (
	_ fs.Fs          = (*Fs)(nil)
	_ fs.Purger      = (*Fs)(nil)
	_ fs.PutStreamer = (*Fs)(nil)
	_ fs.Abouter     = (*Fs)(nil)
	_ fs.Mover       = (*Fs)(nil)
	_ fs.DirMover    = (*Fs)(nil)
)