package layer

import (
	"context"
	"io"
	"time"

	"github.com/nspcc-dev/neofs-api-go/object"
	"github.com/nspcc-dev/neofs-api-go/query"
	"github.com/nspcc-dev/neofs-api-go/refs"
	"github.com/nspcc-dev/neofs-api-go/service"
	"github.com/nspcc-dev/neofs-api-go/storagegroup"
	"github.com/nspcc-dev/neofs-s3-gate/api/pool"
	"github.com/pkg/errors"
	"go.uber.org/zap"
)

const (
	dataChunkSize = 3 * object.UnitsMB
	objectVersion = 1
)

type (
	putParams struct {
		addr        refs.Address
		name        string
		size        int64
		r           io.Reader
		userHeaders map[string]string
	}

	sgParams struct {
		addr    refs.Address
		objects []refs.ObjectID
	}

	delParams struct {
		addr refs.Address
	}

	getParams struct {
		addr   refs.Address
		start  int64
		length int64
		writer io.Writer
	}
)

// objectSearchContainer returns all available objects in the container.
func (n *layer) objectSearchContainer(ctx context.Context, cid refs.CID) ([]refs.ObjectID, error) {
	var q query.Query
	q.Filters = append(q.Filters, query.Filter{
		Type: query.Filter_Exact,
		Name: object.KeyRootObject,
	})

	conn, err := n.cli.GetConnection(ctx)
	if err != nil {
		return nil, err
	}

	queryBinary, err := q.Marshal()
	if err != nil {
		return nil, err
	}

	token, err := n.cli.SessionToken(ctx, &pool.SessionParams{
		Conn: conn,
		Addr: refs.Address{CID: cid},
		Verb: service.Token_Info_Search,
	})
	if err != nil {
		return nil, err
	}

	req := new(object.SearchRequest)
	req.Query = queryBinary
	req.QueryVersion = 1
	req.ContainerID = cid
	req.SetTTL(service.SingleForwardingTTL)
	req.SetToken(token)
	// req.SetBearer(bearerToken)

	err = service.SignRequestData(n.key, req)
	if err != nil {
		return nil, err
	}

	// todo: think about timeout
	ctx, cancel := context.WithTimeout(ctx, 30*time.Second)
	defer cancel()

	searchClient, err := object.NewServiceClient(conn).Search(ctx, req)
	if err != nil {
		return nil, err
	}

	var (
		response []refs.Address
		result   []refs.ObjectID
	)

	for {
		resp, err := searchClient.Recv()
		if err != nil {
			if err == io.EOF {
				break
			}

			return nil, errors.New("search command received error")
		}

		response = append(response, resp.Addresses...)
	}

	for i := range response {
		result = append(result, response[i].ObjectID)
	}

	return result, nil
}

// objectFindIDs returns object id's (uuid) based on they nice name in s3. If
// nice name is uuid compatible, then function returns it.
func (n *layer) objectFindIDs(ctx context.Context, cid refs.CID, name string) ([]refs.ObjectID, error) {
	var q query.Query

	q.Filters = append(q.Filters, query.Filter{
		Type: query.Filter_Exact,
		Name: object.KeyRootObject,
	})
	q.Filters = append(q.Filters, query.Filter{
		Type:  query.Filter_Exact,
		Name:  AWS3NameHeader,
		Value: name,
	})

	queryBinary, err := q.Marshal()
	if err != nil {
		return nil, err
	}

	conn, err := n.cli.GetConnection(ctx)
	if err != nil {
		return nil, err
	}

	token, err := n.cli.SessionToken(ctx, &pool.SessionParams{
		Conn: conn,
		Addr: refs.Address{CID: cid},
		Verb: service.Token_Info_Search,
	})
	if err != nil {
		return nil, err
	}

	req := new(object.SearchRequest)
	req.Query = queryBinary
	req.QueryVersion = 1
	req.ContainerID = cid
	req.SetTTL(service.SingleForwardingTTL)
	req.SetToken(token)
	// req.SetBearer(bearerToken)

	err = service.SignRequestData(n.key, req)
	if err != nil {
		return nil, err
	}

	// todo: think about timeout
	ctx, cancel := context.WithTimeout(ctx, 30*time.Second)
	defer cancel()

	searchClient, err := object.NewServiceClient(conn).Search(ctx, req)
	if err != nil {
		return nil, err
	}

	var response []refs.Address

	for {
		resp, err := searchClient.Recv()
		if err != nil {
			if err == io.EOF {
				break
			}

			return nil, errors.New("search command received error")
		}

		response = append(response, resp.Addresses...)
	}

	switch ln := len(response); {
	case ln > 0:
		result := make([]refs.ObjectID, 0, len(response))
		for i := range response {
			result = append(result, response[i].ObjectID)
		}

		return result, nil
	default:
		return nil, errors.New("object not found")
	}
}

// objectFindID returns object id (uuid) based on it's nice name in s3. If
// nice name is uuid compatible, then function returns it.
func (n *layer) objectFindID(ctx context.Context, cid refs.CID, name string, put bool) (refs.ObjectID, error) {
	var id refs.ObjectID

	if result, err := n.objectFindIDs(ctx, cid, name); err != nil {
		return id, err
	} else if ln := len(result); ln == 0 {
		// Minio lists all objects with and without nice names. All objects
		// without nice name still have "name" in terms of minio - uuid encoded
		// into string. There is a tricky case when user upload object
		// with nice name that is encoded uuid.
		// There is an optimisation to parse name and return uuid if it name is uuid
		// compatible. It _should not_ work in case of put operation, because object
		// with uuid compatible nice name may not exist. Therefore this optimization
		// breaks object put logic and must be turned off.
		if !put {
			err := id.Parse(name)
			if err == nil {
				return id, nil
			}
		}
		return id, errors.New("object not found")
	} else if ln == 1 {
		return result[0], nil
	}

	return id, errors.New("several objects with the same name found")
}

// objectHead returns all object's headers.
func (n *layer) objectHead(ctx context.Context, addr refs.Address) (*object.Object, error) {

	conn, err := n.cli.GetConnection(ctx)
	if err != nil {
		return nil, err
	}

	token, err := n.cli.SessionToken(ctx, &pool.SessionParams{
		Conn: conn,
		Addr: addr,
		Verb: service.Token_Info_Head,
	})
	if err != nil {
		return nil, err
	}

	req := new(object.HeadRequest)
	req.Address = addr
	req.FullHeaders = true
	req.SetTTL(service.SingleForwardingTTL)
	req.SetToken(token)
	// req.SetBearer(bearerToken)

	err = service.SignRequestData(n.key, req)
	if err != nil {
		return nil, err
	}

	// todo: think about timeout
	ctx, cancel := context.WithTimeout(ctx, 30*time.Second)
	defer cancel()

	res, err := object.NewServiceClient(conn).Head(ctx, req)
	if err != nil {
		return nil, err
	}

	return res.Object, nil
}

func receiveObject(cli object.Service_GetClient) (*object.Object, error) {
	var (
		off int
		buf []byte
		obj *object.Object
	)

	for {
		resp, err := cli.Recv()
		if err != nil {
			if err == io.EOF {
				break
			}

			return nil, err
		}

		switch o := resp.R.(type) {
		case *object.GetResponse_Object:
			if obj != nil {
				return nil, errors.New("object headers already received")
			} else if _, hdr := o.Object.LastHeader(object.HeaderType(object.TombstoneHdr)); hdr != nil {
				return nil, errors.New("object already removed")
			}

			obj = o.Object
			buf = make([]byte, obj.SystemHeader.PayloadLength)

			if len(obj.Payload) > 0 {
				off += copy(buf, obj.Payload)
			}
		case *object.GetResponse_Chunk:
			if obj == nil {
				return nil, errors.New("object headers not received")
			}
			off += copy(buf[off:], o.Chunk)
		default:
			return nil, errors.Errorf("unknown response %T", o)
		}
	}

	if obj == nil {
		return nil, errors.New("object headers not received")
	}
	obj.Payload = buf

	return obj, nil
}

// objectGet and write it into provided io.Reader.
func (n *layer) objectGet(ctx context.Context, p getParams) (*object.Object, error) {
	conn, err := n.cli.GetConnection(ctx)
	if err != nil {
		return nil, err
	}

	token, err := n.cli.SessionToken(ctx, &pool.SessionParams{
		Conn: conn,
		Addr: p.addr,
		Verb: service.Token_Info_Get,
	})
	if err != nil {
		return nil, err
	}

	// todo: replace object.Get() call by object.GetRange() for
	//       true sequential reading support; it will be possible when
	//       object.GetRange() response message become gRPC stream.
	req := new(object.GetRequest)
	req.Address = p.addr
	req.SetTTL(service.SingleForwardingTTL)
	req.SetToken(token)
	// req.SetBearer(bearerToken)

	err = service.SignRequestData(n.key, req)
	if err != nil {
		return nil, err
	}

	// todo: think about timeout
	ctx, cancel := context.WithTimeout(ctx, 30*time.Second)
	defer cancel()

	var obj *object.Object

	if cli, err := object.NewServiceClient(conn).Get(ctx, req); err != nil {
		return nil, err
	} else if obj, err = receiveObject(cli); err != nil {
		return nil, err
	} else if ln := int64(obj.SystemHeader.PayloadLength); p.start+p.length > ln {
		return nil, errors.Errorf("slice bounds out of range: len = %d, start = %d, offset = %d",
			ln, p.start, p.length)
	} else if _, err = p.writer.Write(obj.Payload[p.start : p.start+p.length]); err != nil {
		return nil, err
	}

	// remove payload:
	obj.Payload = nil

	return obj, nil
}

// objectPut into neofs, took payload from io.Reader.
func (n *layer) objectPut(ctx context.Context, p putParams) (*object.Object, error) {
	conn, err := n.cli.GetConnection(ctx)
	if err != nil {
		return nil, err
	}

	token, err := n.cli.SessionToken(ctx, &pool.SessionParams{
		Conn: conn,
		Addr: p.addr,
		Verb: service.Token_Info_Put,
	})
	if err != nil {
		n.log.Error("could not prepare token",
			zap.Error(err))
		return nil, err
	}

	putClient, err := object.NewServiceClient(conn).Put(ctx)
	if err != nil {
		n.log.Error("could not prepare PutClient",
			zap.Error(err))
		return nil, err
	}

	if p.userHeaders == nil {
		p.userHeaders = make(map[string]string)
	}

	// Set object name if not set before
	if _, ok := p.userHeaders[AWS3NameHeader]; !ok {
		p.userHeaders[AWS3NameHeader] = p.name
	}

	readBuffer := make([]byte, dataChunkSize)
	obj := &object.Object{
		SystemHeader: object.SystemHeader{
			Version:       objectVersion,
			ID:            p.addr.ObjectID,
			OwnerID:       n.uid,
			CID:           p.addr.CID,
			PayloadLength: uint64(p.size),
		},
		Headers: parseUserHeaders(p.userHeaders),
	}

	req := object.MakePutRequestHeader(obj)
	req.SetTTL(service.SingleForwardingTTL)
	req.SetToken(token)
	// req.SetBearer(bearerToken)

	err = service.SignRequestData(n.key, req)
	if err != nil {
		n.log.Error("could not prepare request",
			zap.Error(err))
		return nil, err
	}

	err = putClient.Send(req)
	if err != nil {
		n.log.Error("could not send request",
			zap.Error(err))
		return nil, err
	}

	read, err := p.r.Read(readBuffer)
	for read > 0 {
		if err != nil && err != io.EOF {
			n.log.Error("something went wrong",
				zap.Error(err))
			return nil, err
		}

		if read > 0 {
			req := object.MakePutRequestChunk(readBuffer[:read])
			req.SetTTL(service.SingleForwardingTTL)
			// req.SetBearer(bearerToken)

			err = service.SignRequestData(n.key, req)
			if err != nil {
				n.log.Error("could not sign chunk request",
					zap.Error(err))
				return nil, err
			}

			err = putClient.Send(req)
			if err != nil && err != io.EOF {
				n.log.Error("could not send chunk",
					zap.Error(err))
				return nil, err
			}
		}

		read, err = p.r.Read(readBuffer)
	}

	_, err = putClient.CloseAndRecv()
	if err != nil {
		n.log.Error("could not finish request",
			zap.Error(err))
		return nil, err
	}

	// maybe make a head?
	return obj, nil
}

// storageGroupPut prepares storage group object and put it into neofs.
func (n *layer) storageGroupPut(ctx context.Context, p sgParams) (*object.Object, error) {
	conn, err := n.cli.GetConnection(ctx)
	if err != nil {
		return nil, err
	}

	token, err := n.cli.SessionToken(ctx, &pool.SessionParams{
		Conn: conn,
		Addr: p.addr,
		Verb: service.Token_Info_Put,
	})
	if err != nil {
		return nil, err
	}

	// todo: think about timeout
	ctx, cancel := context.WithTimeout(ctx, 30*time.Second)
	defer cancel()

	putClient, err := object.NewServiceClient(conn).Put(ctx)
	if err != nil {
		return nil, err
	}

	sg := &object.Object{
		SystemHeader: object.SystemHeader{
			Version: objectVersion,
			ID:      p.addr.ObjectID,
			OwnerID: n.uid,
			CID:     p.addr.CID,
		},
		Headers: make([]object.Header, 0, len(p.objects)),
	}

	for i := range p.objects {
		sg.AddHeader(&object.Header{Value: &object.Header_Link{
			Link: &object.Link{Type: object.Link_StorageGroup, ID: p.objects[i]},
		}})
	}

	sg.SetStorageGroup(new(storagegroup.StorageGroup))

	req := object.MakePutRequestHeader(sg)
	req.SetTTL(service.SingleForwardingTTL)
	req.SetToken(token)
	// req.SetBearer(bearerToken)

	err = service.SignRequestData(n.key, req)
	if err != nil {
		return nil, err
	}

	err = putClient.Send(req)
	if err != nil {
		return nil, err
	}

	_, err = putClient.CloseAndRecv()
	if err != nil {
		return nil, err
	}

	return sg, nil
}

// objectDelete puts tombstone object into neofs.
func (n *layer) objectDelete(ctx context.Context, p delParams) error {
	conn, err := n.cli.GetConnection(ctx)
	if err != nil {
		return err
	}

	token, err := n.cli.SessionToken(ctx, &pool.SessionParams{
		Conn: conn,
		Addr: p.addr,
		Verb: service.Token_Info_Delete,
	})
	if err != nil {
		return err
	}

	req := new(object.DeleteRequest)
	req.Address = p.addr
	req.OwnerID = n.uid
	req.SetTTL(service.SingleForwardingTTL)
	req.SetToken(token)
	// req.SetBearer(bearerToken)

	err = service.SignRequestData(n.key, req)
	if err != nil {
		return err
	}

	// todo: think about timeout
	ctx, cancel := context.WithTimeout(ctx, 30*time.Second)
	defer cancel()

	_, err = object.NewServiceClient(conn).Delete(ctx, req)

	return err
}