//go:build !plan9 && !js

// Package qingstor provides an interface to QingStor object storage
// Home: https://www.qingcloud.com/
package qingstor

import (
	"context"
	"errors"
	"fmt"
	"io"
	"net/http"
	"path"
	"regexp"
	"strconv"
	"strings"
	"time"

	"github.com/rclone/rclone/fs"
	"github.com/rclone/rclone/fs/config"
	"github.com/rclone/rclone/fs/config/configmap"
	"github.com/rclone/rclone/fs/config/configstruct"
	"github.com/rclone/rclone/fs/fshttp"
	"github.com/rclone/rclone/fs/hash"
	"github.com/rclone/rclone/fs/walk"
	"github.com/rclone/rclone/lib/bucket"
	"github.com/rclone/rclone/lib/encoder"
	qsConfig "github.com/yunify/qingstor-sdk-go/v3/config"
	qsErr "github.com/yunify/qingstor-sdk-go/v3/request/errors"
	qs "github.com/yunify/qingstor-sdk-go/v3/service"
)

// Register with Fs
func init() {
	fs.Register(&fs.RegInfo{
		Name:        "qingstor",
		Description: "QingCloud Object Storage",
		NewFs:       NewFs,
		Options: []fs.Option{{
			Name:    "env_auth",
			Help:    "Get QingStor credentials from runtime.\n\nOnly applies if access_key_id and secret_access_key is blank.",
			Default: false,
			Examples: []fs.OptionExample{{
				Value: "false",
				Help:  "Enter QingStor credentials in the next step.",
			}, {
				Value: "true",
				Help:  "Get QingStor credentials from the environment (env vars or IAM).",
			}},
		}, {
			Name:      "access_key_id",
			Help:      "QingStor Access Key ID.\n\nLeave blank for anonymous access or runtime credentials.",
			Sensitive: true,
		}, {
			Name:      "secret_access_key",
			Help:      "QingStor Secret Access Key (password).\n\nLeave blank for anonymous access or runtime credentials.",
			Sensitive: true,
		}, {
			Name: "endpoint",
			Help: "Enter an endpoint URL to connection QingStor API.\n\nLeave blank will use the default value \"https://qingstor.com:443\".",
		}, {
			Name: "zone",
			Help: "Zone to connect to.\n\nDefault is \"pek3a\".",
			Examples: []fs.OptionExample{{
				Value: "pek3a",
				Help:  "The Beijing (China) Three Zone.\nNeeds location constraint pek3a.",
			}, {
				Value: "sh1a",
				Help:  "The Shanghai (China) First Zone.\nNeeds location constraint sh1a.",
			}, {
				Value: "gd2a",
				Help:  "The Guangdong (China) Second Zone.\nNeeds location constraint gd2a.",
			}},
		}, {
			Name:     "connection_retries",
			Help:     "Number of connection retries.",
			Default:  3,
			Advanced: true,
		}, {
			Name: "upload_cutoff",
			Help: `Cutoff for switching to chunked upload.

Any files larger than this will be uploaded in chunks of chunk_size.
The minimum is 0 and the maximum is 5 GiB.`,
			Default:  defaultUploadCutoff,
			Advanced: true,
		}, {
			Name: "chunk_size",
			Help: `Chunk size to use for uploading.

When uploading files larger than upload_cutoff they will be uploaded
as multipart uploads using this chunk size.

Note that "--qingstor-upload-concurrency" chunks of this size are buffered
in memory per transfer.

If you are transferring large files over high-speed links and you have
enough memory, then increasing this will speed up the transfers.`,
			Default:  minChunkSize,
			Advanced: true,
		}, {
			Name: "upload_concurrency",
			Help: `Concurrency for multipart uploads.

This is the number of chunks of the same file that are uploaded
concurrently.

NB if you set this to > 1 then the checksums of multipart uploads
become corrupted (the uploads themselves are not corrupted though).

If you are uploading small numbers of large files over high-speed links
and these uploads do not fully utilize your bandwidth, then increasing
this may help to speed up the transfers.`,
			Default:  1,
			Advanced: true,
		}, {
			Name:     config.ConfigEncoding,
			Help:     config.ConfigEncodingHelp,
			Advanced: true,
			Default: (encoder.EncodeInvalidUtf8 |
				encoder.EncodeCtl |
				encoder.EncodeSlash),
		}},
	})
}

// Constants
const (
	listLimitSize       = 1000                   // Number of items to read at once
	maxSizeForCopy      = 1024 * 1024 * 1024 * 5 // The maximum size of object we can COPY
	minChunkSize        = fs.SizeSuffix(minMultiPartSize)
	defaultUploadCutoff = fs.SizeSuffix(200 * 1024 * 1024)
	maxUploadCutoff     = fs.SizeSuffix(5 * 1024 * 1024 * 1024)
)

// Globals
func timestampToTime(tp int64) time.Time {
	timeLayout := time.RFC3339Nano
	ts := time.Unix(tp, 0).Format(timeLayout)
	tm, _ := time.Parse(timeLayout, ts)
	return tm.UTC()
}

// Options defines the configuration for this backend
type Options struct {
	EnvAuth           bool                 `config:"env_auth"`
	AccessKeyID       string               `config:"access_key_id"`
	SecretAccessKey   string               `config:"secret_access_key"`
	Endpoint          string               `config:"endpoint"`
	Zone              string               `config:"zone"`
	ConnectionRetries int                  `config:"connection_retries"`
	UploadCutoff      fs.SizeSuffix        `config:"upload_cutoff"`
	ChunkSize         fs.SizeSuffix        `config:"chunk_size"`
	UploadConcurrency int                  `config:"upload_concurrency"`
	Enc               encoder.MultiEncoder `config:"encoding"`
}

// Fs represents a remote qingstor server
type Fs struct {
	name          string        // The name of the remote
	root          string        // The root is a subdir, is a special object
	opt           Options       // parsed options
	features      *fs.Features  // optional features
	svc           *qs.Service   // The connection to the qingstor server
	zone          string        // The zone we are working on
	rootBucket    string        // bucket part of root (if any)
	rootDirectory string        // directory part of root (if any)
	cache         *bucket.Cache // cache for bucket creation status
}

// Object describes a qingstor object
type Object struct {
	// Will definitely have everything but meta which may be nil
	//
	// List will read everything but meta & mimeType - to fill
	// that in you need to call readMetaData
	fs           *Fs       // what this object is part of
	remote       string    // object of remote
	etag         string    // md5sum of the object
	size         int64     // length of the object content
	mimeType     string    // ContentType of object - may be ""
	lastModified time.Time // Last modified
	encrypted    bool      // whether the object is encryption
	algo         string    // Custom encryption algorithms
}

// ------------------------------------------------------------

// parsePath parses a remote 'url'
func parsePath(path string) (root string) {
	root = strings.Trim(path, "/")
	return
}

// split returns bucket and bucketPath from the rootRelativePath
// relative to f.root
func (f *Fs) split(rootRelativePath string) (bucketName, bucketPath string) {
	bucketName, bucketPath = bucket.Split(path.Join(f.root, rootRelativePath))
	return f.opt.Enc.FromStandardName(bucketName), f.opt.Enc.FromStandardPath(bucketPath)
}

// split returns bucket and bucketPath from the object
func (o *Object) split() (bucket, bucketPath string) {
	return o.fs.split(o.remote)
}

// Split a URL into three parts: protocol host and port
func qsParseEndpoint(endpoint string) (protocol, host, port string, err error) {
	/*
	  Pattern to match an endpoint,
	  e.g.: "http(s)://qingstor.com:443" --> "http(s)", "qingstor.com", 443
	      "http(s)//qingstor.com"      --> "http(s)", "qingstor.com", ""
	      "qingstor.com"               --> "", "qingstor.com", ""
	*/
	defer func() {
		if r := recover(); r != nil {
			switch x := r.(type) {
			case error:
				err = x
			default:
				err = nil
			}
		}
	}()
	var mather = regexp.MustCompile(`^(?:(http|https)://)*(\w+\.(?:[\w\.])*)(?::(\d{0,5}))*$`)
	parts := mather.FindStringSubmatch(endpoint)
	protocol, host, port = parts[1], parts[2], parts[3]
	return
}

// qsConnection makes a connection to qingstor
func qsServiceConnection(ctx context.Context, opt *Options) (*qs.Service, error) {
	accessKeyID := opt.AccessKeyID
	secretAccessKey := opt.SecretAccessKey

	switch {
	case opt.EnvAuth:
		// No need for empty checks if "env_auth" is true
	case accessKeyID == "" && secretAccessKey == "":
		// if no access key/secret and iam is explicitly disabled then fall back to anon interaction
	case accessKeyID == "":
		return nil, errors.New("access_key_id not found")
	case secretAccessKey == "":
		return nil, errors.New("secret_access_key not found")
	}

	protocol := "https"
	host := "qingstor.com"
	port := 443

	endpoint := opt.Endpoint
	if endpoint != "" {
		_protocol, _host, _port, err := qsParseEndpoint(endpoint)

		if err != nil {
			return nil, fmt.Errorf("the endpoint \"%s\" format error", endpoint)
		}

		if _protocol != "" {
			protocol = _protocol
		}
		host = _host
		if _port != "" {
			port, _ = strconv.Atoi(_port)
		} else if protocol == "http" {
			port = 80
		}

	}

	cf, err := qsConfig.NewDefault()
	if err != nil {
		return nil, err
	}
	cf.AccessKeyID = accessKeyID
	cf.SecretAccessKey = secretAccessKey
	cf.Protocol = protocol
	cf.Host = host
	cf.Port = port
	// unsupported in v3.1: cf.ConnectionRetries = opt.ConnectionRetries
	cf.Connection = fshttp.NewClient(ctx)

	return qs.Init(cf)
}

func checkUploadChunkSize(cs fs.SizeSuffix) error {
	if cs < minChunkSize {
		return fmt.Errorf("%s is less than %s", cs, minChunkSize)
	}
	return nil
}

func (f *Fs) setUploadChunkSize(cs fs.SizeSuffix) (old fs.SizeSuffix, err error) {
	err = checkUploadChunkSize(cs)
	if err == nil {
		old, f.opt.ChunkSize = f.opt.ChunkSize, cs
	}
	return
}

func checkUploadCutoff(cs fs.SizeSuffix) error {
	if cs > maxUploadCutoff {
		return fmt.Errorf("%s is greater than %s", cs, maxUploadCutoff)
	}
	return nil
}

func (f *Fs) setUploadCutoff(cs fs.SizeSuffix) (old fs.SizeSuffix, err error) {
	err = checkUploadCutoff(cs)
	if err == nil {
		old, f.opt.UploadCutoff = f.opt.UploadCutoff, cs
	}
	return
}

// setRoot changes the root of the Fs
func (f *Fs) setRoot(root string) {
	f.root = parsePath(root)
	f.rootBucket, f.rootDirectory = bucket.Split(f.root)
}

// NewFs constructs an Fs from the path, bucket:path
func NewFs(ctx context.Context, name, root string, m configmap.Mapper) (fs.Fs, error) {
	// Parse config into Options struct
	opt := new(Options)
	err := configstruct.Set(m, opt)
	if err != nil {
		return nil, err
	}
	err = checkUploadChunkSize(opt.ChunkSize)
	if err != nil {
		return nil, fmt.Errorf("qingstor: chunk size: %w", err)
	}
	err = checkUploadCutoff(opt.UploadCutoff)
	if err != nil {
		return nil, fmt.Errorf("qingstor: upload cutoff: %w", err)
	}
	svc, err := qsServiceConnection(ctx, opt)
	if err != nil {
		return nil, err
	}

	if opt.Zone == "" {
		opt.Zone = "pek3a"
	}

	f := &Fs{
		name:  name,
		opt:   *opt,
		svc:   svc,
		zone:  opt.Zone,
		cache: bucket.NewCache(),
	}
	f.setRoot(root)
	f.features = (&fs.Features{
		ReadMimeType:      true,
		WriteMimeType:     true,
		BucketBased:       true,
		BucketBasedRootOK: true,
		SlowModTime:       true,
	}).Fill(ctx, f)

	if f.rootBucket != "" && f.rootDirectory != "" {
		// Check to see if the object exists
		bucketInit, err := svc.Bucket(f.rootBucket, opt.Zone)
		if err != nil {
			return nil, err
		}
		encodedDirectory := f.opt.Enc.FromStandardPath(f.rootDirectory)
		_, err = bucketInit.HeadObject(encodedDirectory, &qs.HeadObjectInput{})
		if err == nil {
			newRoot := path.Dir(f.root)
			if newRoot == "." {
				newRoot = ""
			}
			f.setRoot(newRoot)
			// return an error with an fs which points to the parent
			return f, fs.ErrorIsFile
		}
	}
	return f, nil
}

// Name of the remote (as passed into NewFs)
func (f *Fs) Name() string {
	return f.name
}

// Root of the remote (as passed into NewFs)
func (f *Fs) Root() string {
	return f.root
}

// String converts this Fs to a string
func (f *Fs) String() string {
	if f.rootBucket == "" {
		return "QingStor root"
	}
	if f.rootDirectory == "" {
		return fmt.Sprintf("QingStor bucket %s", f.rootBucket)
	}
	return fmt.Sprintf("QingStor bucket %s path %s", f.rootBucket, f.rootDirectory)
}

// Precision of the remote
func (f *Fs) Precision() time.Duration {
	//return time.Nanosecond
	//Not supported temporary
	return fs.ModTimeNotSupported
}

// Hashes returns the supported hash sets.
func (f *Fs) Hashes() hash.Set {
	return hash.Set(hash.MD5)
	//return hash.HashSet(hash.HashNone)
}

// Features returns the optional features of this Fs
func (f *Fs) Features() *fs.Features {
	return f.features
}

// Put created a new object
func (f *Fs) Put(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) {
	fsObj := &Object{
		fs:     f,
		remote: src.Remote(),
	}
	return fsObj, fsObj.Update(ctx, in, src, options...)
}

// Copy src to this remote using server-side copy operations.
//
// This is stored with the remote path given.
//
// It returns the destination Object and a possible error.
//
// Will only be called if src.Fs().Name() == f.Name()
//
// If it isn't possible then return fs.ErrorCantCopy
func (f *Fs) Copy(ctx context.Context, src fs.Object, remote string) (fs.Object, error) {
	dstBucket, dstPath := f.split(remote)
	err := f.makeBucket(ctx, dstBucket)
	if err != nil {
		return nil, err
	}
	srcObj, ok := src.(*Object)
	if !ok {
		fs.Debugf(src, "Can't copy - not same remote type")
		return nil, fs.ErrorCantCopy
	}
	srcBucket, srcPath := srcObj.split()
	source := path.Join("/", srcBucket, srcPath)

	// fs.Debugf(f, "Copied, source key is: %s, and dst key is: %s", source, key)
	req := qs.PutObjectInput{
		XQSCopySource: &source,
	}
	bucketInit, err := f.svc.Bucket(dstBucket, f.zone)

	if err != nil {
		return nil, err
	}
	_, err = bucketInit.PutObject(dstPath, &req)
	if err != nil {
		// fs.Debugf(f, "Copy Failed, API Error: %v", err)
		return nil, err
	}
	return f.NewObject(ctx, remote)
}

// NewObject finds the Object at remote.  If it can't be found
// it returns the error fs.ErrorObjectNotFound.
func (f *Fs) NewObject(ctx context.Context, remote string) (fs.Object, error) {
	return f.newObjectWithInfo(remote, nil)
}

// Return an Object from a path
//
// If it can't be found it returns the error ErrorObjectNotFound.
func (f *Fs) newObjectWithInfo(remote string, info *qs.KeyType) (fs.Object, error) {
	o := &Object{
		fs:     f,
		remote: remote,
	}
	if info != nil {
		// Set info
		if info.Size != nil {
			o.size = *info.Size
		}

		if info.Etag != nil {
			o.etag = qs.StringValue(info.Etag)
		}
		if info.Modified == nil {
			fs.Logf(o, "Failed to read last modified")
			o.lastModified = time.Now()
		} else {
			o.lastModified = timestampToTime(int64(*info.Modified))
		}

		if info.MimeType != nil {
			o.mimeType = qs.StringValue(info.MimeType)
		}

		if info.Encrypted != nil {
			o.encrypted = qs.BoolValue(info.Encrypted)
		}

	} else {
		err := o.readMetaData() // reads info and meta, returning an error
		if err != nil {
			return nil, err
		}
	}
	return o, nil
}

// listFn is called from list to handle an object.
type listFn func(remote string, object *qs.KeyType, isDirectory bool) error

// list the objects into the function supplied
//
// dir is the starting directory, "" for root
//
// Set recurse to read sub directories
func (f *Fs) list(ctx context.Context, bucket, directory, prefix string, addBucket bool, recurse bool, fn listFn) error {
	if prefix != "" {
		prefix += "/"
	}
	if directory != "" {
		directory += "/"
	}
	delimiter := ""
	if !recurse {
		delimiter = "/"
	}
	maxLimit := int(listLimitSize)
	var marker *string
	for {
		bucketInit, err := f.svc.Bucket(bucket, f.zone)
		if err != nil {
			return err
		}
		req := qs.ListObjectsInput{
			Delimiter: &delimiter,
			Prefix:    &directory,
			Limit:     &maxLimit,
			Marker:    marker,
		}
		resp, err := bucketInit.ListObjects(&req)
		if err != nil {
			if e, ok := err.(*qsErr.QingStorError); ok {
				if e.StatusCode == http.StatusNotFound {
					err = fs.ErrorDirNotFound
				}
			}
			return err
		}
		if !recurse {
			for _, commonPrefix := range resp.CommonPrefixes {
				if commonPrefix == nil {
					fs.Logf(f, "Nil common prefix received")
					continue
				}
				remote := *commonPrefix
				remote = f.opt.Enc.ToStandardPath(remote)
				if !strings.HasPrefix(remote, prefix) {
					fs.Logf(f, "Odd name received %q", remote)
					continue
				}
				remote = remote[len(prefix):]
				if addBucket {
					remote = path.Join(bucket, remote)
				}
				remote = strings.TrimSuffix(remote, "/")
				err = fn(remote, &qs.KeyType{Key: &remote}, true)
				if err != nil {
					return err
				}
			}
		}

		for _, object := range resp.Keys {
			remote := qs.StringValue(object.Key)
			remote = f.opt.Enc.ToStandardPath(remote)
			if !strings.HasPrefix(remote, prefix) {
				fs.Logf(f, "Odd name received %q", remote)
				continue
			}
			remote = remote[len(prefix):]
			if addBucket {
				remote = path.Join(bucket, remote)
			}
			err = fn(remote, object, false)
			if err != nil {
				return err
			}
		}
		if resp.HasMore != nil && !*resp.HasMore {
			break
		}
		// Use NextMarker if set, otherwise use last Key
		if resp.NextMarker == nil || *resp.NextMarker == "" {
			fs.Errorf(f, "Expecting NextMarker but didn't find one")
			break
		} else {
			marker = resp.NextMarker
		}
	}
	return nil
}

// Convert a list item into a BasicInfo
func (f *Fs) itemToDirEntry(remote string, object *qs.KeyType, isDirectory bool) (fs.DirEntry, error) {
	if isDirectory {
		size := int64(0)
		if object.Size != nil {
			size = *object.Size
		}
		d := fs.NewDir(remote, time.Time{}).SetSize(size)
		return d, nil
	}
	o, err := f.newObjectWithInfo(remote, object)
	if err != nil {
		return nil, err
	}
	return o, nil
}

// listDir lists files and directories to out
func (f *Fs) listDir(ctx context.Context, bucket, directory, prefix string, addBucket bool) (entries fs.DirEntries, err error) {
	// List the objects and directories
	err = f.list(ctx, bucket, directory, prefix, addBucket, false, func(remote string, object *qs.KeyType, isDirectory bool) error {
		entry, err := f.itemToDirEntry(remote, object, isDirectory)
		if err != nil {
			return err
		}
		if entry != nil {
			entries = append(entries, entry)
		}
		return nil
	})
	if err != nil {
		return nil, err
	}
	// bucket must be present if listing succeeded
	f.cache.MarkOK(bucket)
	return entries, nil
}

// listBuckets lists the buckets to out
func (f *Fs) listBuckets(ctx context.Context) (entries fs.DirEntries, err error) {
	req := qs.ListBucketsInput{
		Location: &f.zone,
	}
	resp, err := f.svc.ListBuckets(&req)
	if err != nil {
		return nil, err
	}

	for _, bucket := range resp.Buckets {
		d := fs.NewDir(f.opt.Enc.ToStandardName(qs.StringValue(bucket.Name)), qs.TimeValue(bucket.Created))
		entries = append(entries, d)
	}
	return entries, nil
}

// List the objects and directories in dir into entries.  The
// entries can be returned in any order but should be for a
// complete directory.
//
// dir should be "" to list the root, and should not have
// trailing slashes.
//
// This should return ErrDirNotFound if the directory isn't
// found.
func (f *Fs) List(ctx context.Context, dir string) (entries fs.DirEntries, err error) {
	bucket, directory := f.split(dir)
	if bucket == "" {
		if directory != "" {
			return nil, fs.ErrorListBucketRequired
		}
		return f.listBuckets(ctx)
	}
	return f.listDir(ctx, bucket, directory, f.rootDirectory, f.rootBucket == "")
}

// ListR lists the objects and directories of the Fs starting
// from dir recursively into out.
//
// dir should be "" to start from the root, and should not
// have trailing slashes.
//
// This should return ErrDirNotFound if the directory isn't
// found.
//
// It should call callback for each tranche of entries read.
// These need not be returned in any particular order.  If
// callback returns an error then the listing will stop
// immediately.
//
// Don't implement this unless you have a more efficient way
// of listing recursively that doing a directory traversal.
func (f *Fs) ListR(ctx context.Context, dir string, callback fs.ListRCallback) (err error) {
	bucket, directory := f.split(dir)
	list := walk.NewListRHelper(callback)
	listR := func(bucket, directory, prefix string, addBucket bool) error {
		return f.list(ctx, bucket, directory, prefix, addBucket, true, func(remote string, object *qs.KeyType, isDirectory bool) error {
			entry, err := f.itemToDirEntry(remote, object, isDirectory)
			if err != nil {
				return err
			}
			return list.Add(entry)
		})
	}
	if bucket == "" {
		entries, err := f.listBuckets(ctx)
		if err != nil {
			return err
		}
		for _, entry := range entries {
			err = list.Add(entry)
			if err != nil {
				return err
			}
			bucket := entry.Remote()
			err = listR(bucket, "", f.rootDirectory, true)
			if err != nil {
				return err
			}
			// bucket must be present if listing succeeded
			f.cache.MarkOK(bucket)
		}
	} else {
		err = listR(bucket, directory, f.rootDirectory, f.rootBucket == "")
		if err != nil {
			return err
		}
		// bucket must be present if listing succeeded
		f.cache.MarkOK(bucket)
	}
	return list.Flush()
}

// Mkdir creates the bucket if it doesn't exist
func (f *Fs) Mkdir(ctx context.Context, dir string) error {
	bucket, _ := f.split(dir)
	return f.makeBucket(ctx, bucket)
}

// makeBucket creates the bucket if it doesn't exist
func (f *Fs) makeBucket(ctx context.Context, bucket string) error {
	return f.cache.Create(bucket, func() error {
		bucketInit, err := f.svc.Bucket(bucket, f.zone)
		if err != nil {
			return err
		}
		/* When delete a bucket, qingstor need about 60 second to sync status;
		So, need wait for it sync end if we try to operation a just deleted bucket
		*/
		wasDeleted := false
		retries := 0
		for retries <= 120 {
			statistics, err := bucketInit.GetStatistics()
			if statistics == nil || err != nil {
				break
			}
			switch *statistics.Status {
			case "deleted":
				fs.Debugf(f, "Wait for qingstor bucket to be deleted, retries: %d", retries)
				time.Sleep(time.Second * 1)
				retries++
				wasDeleted = true
				continue
			}
			break
		}

		retries = 0
		for retries <= 120 {
			_, err = bucketInit.Put()
			if e, ok := err.(*qsErr.QingStorError); ok {
				if e.StatusCode == http.StatusConflict {
					if wasDeleted {
						fs.Debugf(f, "Wait for qingstor bucket to be creatable, retries: %d", retries)
						time.Sleep(time.Second * 1)
						retries++
						continue
					}
					err = nil
				}
			}
			break
		}
		return err
	}, nil)
}

// bucketIsEmpty check if the bucket empty
func (f *Fs) bucketIsEmpty(bucket string) (bool, error) {
	bucketInit, err := f.svc.Bucket(bucket, f.zone)
	if err != nil {
		return true, err
	}

	statistics, err := bucketInit.GetStatistics()
	if err != nil {
		return true, err
	}

	if *statistics.Count == 0 {
		return true, nil
	}
	return false, nil
}

// Rmdir delete a bucket
func (f *Fs) Rmdir(ctx context.Context, dir string) error {
	bucket, directory := f.split(dir)
	if bucket == "" || directory != "" {
		return nil
	}
	isEmpty, err := f.bucketIsEmpty(bucket)
	if err != nil {
		return err
	}
	if !isEmpty {
		// fs.Debugf(f, "The bucket %s you tried to delete not empty.", bucket)
		return errors.New("BucketNotEmpty: The bucket you tried to delete is not empty")
	}
	return f.cache.Remove(bucket, func() error {
		// fs.Debugf(f, "Deleting the bucket %s", bucket)
		bucketInit, err := f.svc.Bucket(bucket, f.zone)
		if err != nil {
			return err
		}
		retries := 0
		for retries <= 10 {
			_, delErr := bucketInit.Delete()
			if delErr != nil {
				if e, ok := delErr.(*qsErr.QingStorError); ok {
					switch e.Code {
					// The status of "lease" takes a few seconds to "ready" when creating a new bucket
					// wait for lease status ready
					case "lease_not_ready":
						fs.Debugf(f, "QingStor bucket lease not ready, retries: %d", retries)
						retries++
						time.Sleep(time.Second * 1)
						continue
					default:
						err = e
					}
				}
			} else {
				err = delErr
			}
			break
		}
		return err
	})
}

// cleanUpBucket removes all pending multipart uploads for a given bucket
func (f *Fs) cleanUpBucket(ctx context.Context, bucket string) (err error) {
	fs.Infof(f, "cleaning bucket %q of pending multipart uploads older than 24 hours", bucket)
	bucketInit, err := f.svc.Bucket(bucket, f.zone)
	if err != nil {
		return err
	}
	// maxLimit := int(listLimitSize)
	var marker *string
	for {
		req := qs.ListMultipartUploadsInput{
			// The default is 200 but this errors if more than 200 is put in so leave at the default
			// Limit:     &maxLimit,
			KeyMarker: marker,
		}
		var resp *qs.ListMultipartUploadsOutput
		resp, err = bucketInit.ListMultipartUploads(&req)
		if err != nil {
			return fmt.Errorf("clean up bucket list multipart uploads: %w", err)
		}
		for _, upload := range resp.Uploads {
			if upload.Created != nil && upload.Key != nil && upload.UploadID != nil {
				age := time.Since(*upload.Created)
				if age > 24*time.Hour {
					fs.Infof(f, "removing pending multipart upload for %q dated %v (%v ago)", *upload.Key, upload.Created, age)
					req := qs.AbortMultipartUploadInput{
						UploadID: upload.UploadID,
					}
					_, abortErr := bucketInit.AbortMultipartUpload(*upload.Key, &req)
					if abortErr != nil {
						err = fmt.Errorf("failed to remove multipart upload for %q: %w", *upload.Key, abortErr)
						fs.Errorf(f, "%v", err)
					}
				} else {
					fs.Debugf(f, "ignoring pending multipart upload for %q dated %v (%v ago)", *upload.Key, upload.Created, age)
				}
			}
		}
		if resp.HasMore != nil && !*resp.HasMore {
			break
		}
		// Use NextMarker if set, otherwise use last Key
		if resp.NextKeyMarker == nil || *resp.NextKeyMarker == "" {
			fs.Errorf(f, "Expecting NextKeyMarker but didn't find one")
			break
		} else {
			marker = resp.NextKeyMarker
		}
	}
	return err
}

// CleanUp removes all pending multipart uploads
func (f *Fs) CleanUp(ctx context.Context) (err error) {
	if f.rootBucket != "" {
		return f.cleanUpBucket(ctx, f.rootBucket)
	}
	entries, err := f.listBuckets(ctx)
	if err != nil {
		return err
	}
	for _, entry := range entries {
		cleanErr := f.cleanUpBucket(ctx, f.opt.Enc.FromStandardName(entry.Remote()))
		if cleanErr != nil {
			fs.Errorf(f, "Failed to cleanup bucket: %q", cleanErr)
			err = cleanErr
		}
	}
	return err
}

// readMetaData gets the metadata if it hasn't already been fetched
//
// it also sets the info
func (o *Object) readMetaData() (err error) {
	bucket, bucketPath := o.split()
	bucketInit, err := o.fs.svc.Bucket(bucket, o.fs.zone)
	if err != nil {
		return err
	}
	// fs.Debugf(o, "Read metadata of key: %s", key)
	resp, err := bucketInit.HeadObject(bucketPath, &qs.HeadObjectInput{})
	if err != nil {
		// fs.Debugf(o, "Read metadata failed, API Error: %v", err)
		if e, ok := err.(*qsErr.QingStorError); ok {
			if e.StatusCode == http.StatusNotFound {
				return fs.ErrorObjectNotFound
			}
		}
		return err
	}
	// Ignore missing Content-Length assuming it is 0
	if resp.ContentLength != nil {
		o.size = *resp.ContentLength
	}

	if resp.ETag != nil {
		o.etag = qs.StringValue(resp.ETag)
	}

	if resp.LastModified == nil {
		fs.Logf(o, "Failed to read last modified from HEAD: %v", err)
		o.lastModified = time.Now()
	} else {
		o.lastModified = *resp.LastModified
	}

	if resp.ContentType != nil {
		o.mimeType = qs.StringValue(resp.ContentType)
	}

	if resp.XQSEncryptionCustomerAlgorithm != nil {
		o.algo = qs.StringValue(resp.XQSEncryptionCustomerAlgorithm)
		o.encrypted = true
	}

	return nil
}

// ModTime returns the modification date of the file
// It should return a best guess if one isn't available
func (o *Object) ModTime(ctx context.Context) time.Time {
	err := o.readMetaData()
	if err != nil {
		fs.Logf(o, "Failed to read metadata, %v", err)
		return time.Now()
	}
	modTime := o.lastModified
	return modTime
}

// SetModTime sets the modification time of the local fs object
func (o *Object) SetModTime(ctx context.Context, modTime time.Time) error {
	err := o.readMetaData()
	if err != nil {
		return err
	}
	o.lastModified = modTime
	mimeType := fs.MimeType(ctx, o)

	if o.size >= maxSizeForCopy {
		fs.Debugf(o, "SetModTime is unsupported for objects bigger than %v bytes", fs.SizeSuffix(maxSizeForCopy))
		return nil
	}
	// Copy the object to itself to update the metadata
	bucket, bucketPath := o.split()
	sourceKey := path.Join("/", bucket, bucketPath)

	bucketInit, err := o.fs.svc.Bucket(bucket, o.fs.zone)
	if err != nil {
		return err
	}

	req := qs.PutObjectInput{
		XQSCopySource: &sourceKey,
		ContentType:   &mimeType,
	}
	_, err = bucketInit.PutObject(bucketPath, &req)

	return err
}

// Open opens the file for read.  Call Close() on the returned io.ReadCloser
func (o *Object) Open(ctx context.Context, options ...fs.OpenOption) (io.ReadCloser, error) {
	bucket, bucketPath := o.split()
	bucketInit, err := o.fs.svc.Bucket(bucket, o.fs.zone)
	if err != nil {
		return nil, err
	}

	req := qs.GetObjectInput{}
	fs.FixRangeOption(options, o.size)
	for _, option := range options {
		switch option.(type) {
		case *fs.RangeOption, *fs.SeekOption:
			_, value := option.Header()
			req.Range = &value
		default:
			if option.Mandatory() {
				fs.Logf(o, "Unsupported mandatory option: %v", option)
			}
		}
	}
	resp, err := bucketInit.GetObject(bucketPath, &req)
	if err != nil {
		return nil, err
	}
	return resp.Body, nil
}

// Update in to the object
func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) error {
	// The maximum size of upload object is multipartUploadSize * MaxMultipleParts
	bucket, bucketPath := o.split()
	err := o.fs.makeBucket(ctx, bucket)
	if err != nil {
		return err
	}

	// Guess the content type
	mimeType := fs.MimeType(ctx, src)

	req := uploadInput{
		body:        in,
		qsSvc:       o.fs.svc,
		bucket:      bucket,
		zone:        o.fs.zone,
		key:         bucketPath,
		mimeType:    mimeType,
		partSize:    int64(o.fs.opt.ChunkSize),
		concurrency: o.fs.opt.UploadConcurrency,
	}
	uploader := newUploader(&req)

	size := src.Size()
	multipart := size < 0 || size >= int64(o.fs.opt.UploadCutoff)
	if multipart {
		err = uploader.upload()
	} else {
		err = uploader.singlePartUpload(in, size)
	}
	if err != nil {
		return err
	}
	// Read Metadata of object
	err = o.readMetaData()
	return err
}

// Remove this object
func (o *Object) Remove(ctx context.Context) error {
	bucket, bucketPath := o.split()
	bucketInit, err := o.fs.svc.Bucket(bucket, o.fs.zone)
	if err != nil {
		return err
	}
	_, err = bucketInit.DeleteObject(bucketPath)
	return err
}

// Fs returns read only access to the Fs that this object is part of
func (o *Object) Fs() fs.Info {
	return o.fs
}

var matchMd5 = regexp.MustCompile(`^[0-9a-f]{32}$`)

// Hash returns the selected checksum of the file
// If no checksum is available it returns ""
func (o *Object) Hash(ctx context.Context, t hash.Type) (string, error) {
	if t != hash.MD5 {
		return "", hash.ErrUnsupported
	}
	etag := strings.Trim(strings.ToLower(o.etag), `"`)
	// Check the etag is a valid md5sum
	if !matchMd5.MatchString(etag) {
		fs.Debugf(o, "Invalid md5sum (probably multipart uploaded) - ignoring: %q", etag)
		return "", nil
	}
	return etag, nil
}

// Storable says whether this object can be stored
func (o *Object) Storable() bool {
	return true
}

// String returns a description of the Object
func (o *Object) String() string {
	if o == nil {
		return "<nil>"
	}
	return o.remote
}

// Remote returns the remote path
func (o *Object) Remote() string {
	return o.remote
}

// Size returns the size of the file
func (o *Object) Size() int64 {
	return o.size
}

// MimeType of an Object if known, "" otherwise
func (o *Object) MimeType(ctx context.Context) string {
	err := o.readMetaData()
	if err != nil {
		fs.Logf(o, "Failed to read metadata: %v", err)
		return ""
	}
	return o.mimeType
}

// Check the interfaces are satisfied
var (
	_ fs.Fs         = &Fs{}
	_ fs.CleanUpper = &Fs{}
	_ fs.Copier     = &Fs{}
	_ fs.Object     = &Object{}
	_ fs.ListRer    = &Fs{}
	_ fs.MimeTyper  = &Object{}
)