package swift

import (
	"bufio"
	"bytes"
	"crypto/rand"
	"crypto/sha1"
	"encoding/hex"
	"errors"
	"fmt"
	"io"
	"os"
	gopath "path"
	"strconv"
	"strings"
	"time"
)

// NotLargeObject is returned if an operation is performed on an object which isn't large.
var NotLargeObject = errors.New("Not a large object")

// readAfterWriteTimeout defines the time we wait before an object appears after having been uploaded
var readAfterWriteTimeout = 15 * time.Second

// readAfterWriteWait defines the time to sleep between two retries
var readAfterWriteWait = 200 * time.Millisecond

// largeObjectCreateFile represents an open static or dynamic large object
type largeObjectCreateFile struct {
	conn             *Connection
	container        string
	objectName       string
	currentLength    int64
	filePos          int64
	chunkSize        int64
	segmentContainer string
	prefix           string
	contentType      string
	checkHash        bool
	segments         []Object
	headers          Headers
	minChunkSize     int64
}

func swiftSegmentPath(path string) (string, error) {
	checksum := sha1.New()
	random := make([]byte, 32)
	if _, err := rand.Read(random); err != nil {
		return "", err
	}
	path = hex.EncodeToString(checksum.Sum(append([]byte(path), random...)))
	return strings.TrimLeft(strings.TrimRight("segments/"+path[0:3]+"/"+path[3:], "/"), "/"), nil
}

func getSegment(segmentPath string, partNumber int) string {
	return fmt.Sprintf("%s/%016d", segmentPath, partNumber)
}

func parseFullPath(manifest string) (container string, prefix string) {
	components := strings.SplitN(manifest, "/", 2)
	container = components[0]
	if len(components) > 1 {
		prefix = components[1]
	}
	return container, prefix
}

func (headers Headers) IsLargeObjectDLO() bool {
	_, isDLO := headers["X-Object-Manifest"]
	return isDLO
}

func (headers Headers) IsLargeObjectSLO() bool {
	_, isSLO := headers["X-Static-Large-Object"]
	return isSLO
}

func (headers Headers) IsLargeObject() bool {
	return headers.IsLargeObjectSLO() || headers.IsLargeObjectDLO()
}

func (c *Connection) getAllSegments(container string, path string, headers Headers) (string, []Object, error) {
	if manifest, isDLO := headers["X-Object-Manifest"]; isDLO {
		segmentContainer, segmentPath := parseFullPath(manifest)
		segments, err := c.getAllDLOSegments(segmentContainer, segmentPath)
		return segmentContainer, segments, err
	}
	if headers.IsLargeObjectSLO() {
		return c.getAllSLOSegments(container, path)
	}
	return "", nil, NotLargeObject
}

// LargeObjectOpts describes how a large object should be created
type LargeObjectOpts struct {
	Container        string  // Name of container to place object
	ObjectName       string  // Name of object
	Flags            int     // Creation flags
	CheckHash        bool    // If set Check the hash
	Hash             string  // If set use this hash to check
	ContentType      string  // Content-Type of the object
	Headers          Headers // Additional headers to upload the object with
	ChunkSize        int64   // Size of chunks of the object, defaults to 10MB if not set
	MinChunkSize     int64   // Minimum chunk size, automatically set for SLO's based on info
	SegmentContainer string  // Name of the container to place segments
	SegmentPrefix    string  // Prefix to use for the segments
	NoBuffer         bool    // Prevents using a bufio.Writer to write segments
}

type LargeObjectFile interface {
	io.Writer
	io.Seeker
	io.Closer
	Size() int64
	Flush() error
}

// largeObjectCreate creates a large object at opts.Container, opts.ObjectName.
//
// opts.Flags can have the following bits set
//   os.TRUNC  - remove the contents of the large object if it exists
//   os.APPEND - write at the end of the large object
func (c *Connection) largeObjectCreate(opts *LargeObjectOpts) (*largeObjectCreateFile, error) {
	var (
		segmentPath      string
		segmentContainer string
		segments         []Object
		currentLength    int64
		err              error
	)

	if opts.SegmentPrefix != "" {
		segmentPath = opts.SegmentPrefix
	} else if segmentPath, err = swiftSegmentPath(opts.ObjectName); err != nil {
		return nil, err
	}

	if info, headers, err := c.Object(opts.Container, opts.ObjectName); err == nil {
		if opts.Flags&os.O_TRUNC != 0 {
			c.LargeObjectDelete(opts.Container, opts.ObjectName)
		} else {
			currentLength = info.Bytes
			if headers.IsLargeObject() {
				segmentContainer, segments, err = c.getAllSegments(opts.Container, opts.ObjectName, headers)
				if err != nil {
					return nil, err
				}
				if len(segments) > 0 {
					segmentPath = gopath.Dir(segments[0].Name)
				}
			} else {
				if err = c.ObjectMove(opts.Container, opts.ObjectName, opts.Container, getSegment(segmentPath, 1)); err != nil {
					return nil, err
				}
				segments = append(segments, info)
			}
		}
	} else if err != ObjectNotFound {
		return nil, err
	}

	// segmentContainer is not empty when the manifest already existed
	if segmentContainer == "" {
		if opts.SegmentContainer != "" {
			segmentContainer = opts.SegmentContainer
		} else {
			segmentContainer = opts.Container + "_segments"
		}
	}

	file := &largeObjectCreateFile{
		conn:             c,
		checkHash:        opts.CheckHash,
		container:        opts.Container,
		objectName:       opts.ObjectName,
		chunkSize:        opts.ChunkSize,
		minChunkSize:     opts.MinChunkSize,
		headers:          opts.Headers,
		segmentContainer: segmentContainer,
		prefix:           segmentPath,
		segments:         segments,
		currentLength:    currentLength,
	}

	if file.chunkSize == 0 {
		file.chunkSize = 10 * 1024 * 1024
	}

	if file.minChunkSize > file.chunkSize {
		file.chunkSize = file.minChunkSize
	}

	if opts.Flags&os.O_APPEND != 0 {
		file.filePos = currentLength
	}

	return file, nil
}

// LargeObjectDelete deletes the large object named by container, path
func (c *Connection) LargeObjectDelete(container string, objectName string) error {
	_, headers, err := c.Object(container, objectName)
	if err != nil {
		return err
	}

	var objects [][]string
	if headers.IsLargeObject() {
		segmentContainer, segments, err := c.getAllSegments(container, objectName, headers)
		if err != nil {
			return err
		}
		for _, obj := range segments {
			objects = append(objects, []string{segmentContainer, obj.Name})
		}
	}
	objects = append(objects, []string{container, objectName})

	info, err := c.cachedQueryInfo()
	if err == nil && info.SupportsBulkDelete() && len(objects) > 0 {
		filenames := make([]string, len(objects))
		for i, obj := range objects {
			filenames[i] = obj[0] + "/" + obj[1]
		}
		_, err = c.doBulkDelete(filenames)
		// Don't fail on ObjectNotFound because eventual consistency
		// makes this situation normal.
		if err != nil && err != Forbidden && err != ObjectNotFound {
			return err
		}
	} else {
		for _, obj := range objects {
			if err := c.ObjectDelete(obj[0], obj[1]); err != nil {
				return err
			}
		}
	}

	return nil
}

// LargeObjectGetSegments returns all the segments that compose an object
// If the object is a Dynamic Large Object (DLO), it just returns the objects
// that have the prefix as indicated by the manifest.
// If the object is a Static Large Object (SLO), it retrieves the JSON content
// of the manifest and return all the segments of it.
func (c *Connection) LargeObjectGetSegments(container string, path string) (string, []Object, error) {
	_, headers, err := c.Object(container, path)
	if err != nil {
		return "", nil, err
	}

	return c.getAllSegments(container, path, headers)
}

// Seek sets the offset for the next write operation
func (file *largeObjectCreateFile) Seek(offset int64, whence int) (int64, error) {
	switch whence {
	case 0:
		file.filePos = offset
	case 1:
		file.filePos += offset
	case 2:
		file.filePos = file.currentLength + offset
	default:
		return -1, fmt.Errorf("invalid value for whence")
	}
	if file.filePos < 0 {
		return -1, fmt.Errorf("negative offset")
	}
	return file.filePos, nil
}

func (file *largeObjectCreateFile) Size() int64 {
	return file.currentLength
}

func withLORetry(expectedSize int64, fn func() (Headers, int64, error)) (err error) {
	endTimer := time.NewTimer(readAfterWriteTimeout)
	defer endTimer.Stop()
	waitingTime := readAfterWriteWait
	for {
		var headers Headers
		var sz int64
		if headers, sz, err = fn(); err == nil {
			if !headers.IsLargeObjectDLO() || (expectedSize == 0 && sz > 0) || expectedSize == sz {
				return
			}
		} else {
			return
		}
		waitTimer := time.NewTimer(waitingTime)
		select {
		case <-endTimer.C:
			waitTimer.Stop()
			err = fmt.Errorf("Timeout expired while waiting for object to have size == %d, got: %d", expectedSize, sz)
			return
		case <-waitTimer.C:
			waitingTime *= 2
		}
	}
}

func (c *Connection) waitForSegmentsToShowUp(container, objectName string, expectedSize int64) (err error) {
	err = withLORetry(expectedSize, func() (Headers, int64, error) {
		var info Object
		var headers Headers
		info, headers, err = c.objectBase(container, objectName)
		if err != nil {
			return headers, 0, err
		}
		return headers, info.Bytes, nil
	})
	return
}

// Write satisfies the io.Writer interface
func (file *largeObjectCreateFile) Write(buf []byte) (int, error) {
	var sz int64
	var relativeFilePos int
	writeSegmentIdx := 0
	for i, obj := range file.segments {
		if file.filePos < sz+obj.Bytes || (i == len(file.segments)-1 && file.filePos < sz+file.minChunkSize) {
			relativeFilePos = int(file.filePos - sz)
			break
		}
		writeSegmentIdx++
		sz += obj.Bytes
	}
	sizeToWrite := len(buf)
	for offset := 0; offset < sizeToWrite; {
		newSegment, n, err := file.writeSegment(buf[offset:], writeSegmentIdx, relativeFilePos)
		if err != nil {
			return 0, err
		}
		if writeSegmentIdx < len(file.segments) {
			file.segments[writeSegmentIdx] = *newSegment
		} else {
			file.segments = append(file.segments, *newSegment)
		}
		offset += n
		writeSegmentIdx++
		relativeFilePos = 0
	}
	file.filePos += int64(sizeToWrite)
	file.currentLength = 0
	for _, obj := range file.segments {
		file.currentLength += obj.Bytes
	}
	return sizeToWrite, nil
}

func (file *largeObjectCreateFile) writeSegment(buf []byte, writeSegmentIdx int, relativeFilePos int) (*Object, int, error) {
	var (
		readers         []io.Reader
		existingSegment *Object
		segmentSize     int
	)
	segmentName := getSegment(file.prefix, writeSegmentIdx+1)
	sizeToRead := int(file.chunkSize)
	if writeSegmentIdx < len(file.segments) {
		existingSegment = &file.segments[writeSegmentIdx]
		if writeSegmentIdx != len(file.segments)-1 {
			sizeToRead = int(existingSegment.Bytes)
		}
		if relativeFilePos > 0 {
			headers := make(Headers)
			headers["Range"] = "bytes=0-" + strconv.FormatInt(int64(relativeFilePos-1), 10)
			existingSegmentReader, _, err := file.conn.ObjectOpen(file.segmentContainer, segmentName, true, headers)
			if err != nil {
				return nil, 0, err
			}
			defer existingSegmentReader.Close()
			sizeToRead -= relativeFilePos
			segmentSize += relativeFilePos
			readers = []io.Reader{existingSegmentReader}
		}
	}
	if sizeToRead > len(buf) {
		sizeToRead = len(buf)
	}
	segmentSize += sizeToRead
	readers = append(readers, bytes.NewReader(buf[:sizeToRead]))
	if existingSegment != nil && segmentSize < int(existingSegment.Bytes) {
		headers := make(Headers)
		headers["Range"] = "bytes=" + strconv.FormatInt(int64(segmentSize), 10) + "-"
		tailSegmentReader, _, err := file.conn.ObjectOpen(file.segmentContainer, segmentName, true, headers)
		if err != nil {
			return nil, 0, err
		}
		defer tailSegmentReader.Close()
		segmentSize = int(existingSegment.Bytes)
		readers = append(readers, tailSegmentReader)
	}
	segmentReader := io.MultiReader(readers...)
	headers, err := file.conn.ObjectPut(file.segmentContainer, segmentName, segmentReader, true, "", file.contentType, nil)
	if err != nil {
		return nil, 0, err
	}
	return &Object{Name: segmentName, Bytes: int64(segmentSize), Hash: headers["Etag"]}, sizeToRead, nil
}

func withBuffer(opts *LargeObjectOpts, lo LargeObjectFile) LargeObjectFile {
	if !opts.NoBuffer {
		return &bufferedLargeObjectFile{
			LargeObjectFile: lo,
			bw:              bufio.NewWriterSize(lo, int(opts.ChunkSize)),
		}
	}
	return lo
}

type bufferedLargeObjectFile struct {
	LargeObjectFile
	bw *bufio.Writer
}

func (blo *bufferedLargeObjectFile) Close() error {
	err := blo.bw.Flush()
	if err != nil {
		return err
	}
	return blo.LargeObjectFile.Close()
}

func (blo *bufferedLargeObjectFile) Write(p []byte) (n int, err error) {
	return blo.bw.Write(p)
}

func (blo *bufferedLargeObjectFile) Seek(offset int64, whence int) (int64, error) {
	err := blo.bw.Flush()
	if err != nil {
		return 0, err
	}
	return blo.LargeObjectFile.Seek(offset, whence)
}

func (blo *bufferedLargeObjectFile) Size() int64 {
	return blo.LargeObjectFile.Size() + int64(blo.bw.Buffered())
}

func (blo *bufferedLargeObjectFile) Flush() error {
	err := blo.bw.Flush()
	if err != nil {
		return err
	}
	return blo.LargeObjectFile.Flush()
}