// Copyright 2015 Google Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//      http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package bigquery

import (
	"errors"
	"fmt"
	"time"

	"golang.org/x/net/context"

	"cloud.google.com/go/internal/optional"
	bq "google.golang.org/api/bigquery/v2"
)

// A Table is a reference to a BigQuery table.
type Table struct {
	// ProjectID, DatasetID and TableID may be omitted if the Table is the destination for a query.
	// In this case the result will be stored in an ephemeral table.
	ProjectID string
	DatasetID string
	// TableID must contain only letters (a-z, A-Z), numbers (0-9), or underscores (_).
	// The maximum length is 1,024 characters.
	TableID string

	c *Client
}

// TableMetadata contains information about a BigQuery table.
type TableMetadata struct {
	// The following fields can be set when creating a table.

	// The user-friendly name for the table.
	Name string

	// The user-friendly description of the table.
	Description string

	// The table schema. If provided on create, ViewQuery must be empty.
	Schema Schema

	// The query to use for a view. If provided on create, Schema must be nil.
	ViewQuery string

	// Use Legacy SQL for the view query.
	// At most one of UseLegacySQL and UseStandardSQL can be true.
	UseLegacySQL bool

	// Use Legacy SQL for the view query. The default.
	// At most one of UseLegacySQL and UseStandardSQL can be true.
	// Deprecated: use UseLegacySQL.
	UseStandardSQL bool

	// If non-nil, the table is partitioned by time.
	TimePartitioning *TimePartitioning

	// The time when this table expires. If not set, the table will persist
	// indefinitely. Expired tables will be deleted and their storage reclaimed.
	ExpirationTime time.Time

	// User-provided labels.
	Labels map[string]string

	// Information about a table stored outside of BigQuery.
	ExternalDataConfig *ExternalDataConfig

	// All the fields below are read-only.

	FullID           string // An opaque ID uniquely identifying the table.
	Type             TableType
	CreationTime     time.Time
	LastModifiedTime time.Time

	// The size of the table in bytes.
	// This does not include data that is being buffered during a streaming insert.
	NumBytes int64

	// The number of rows of data in this table.
	// This does not include data that is being buffered during a streaming insert.
	NumRows uint64

	// Contains information regarding this table's streaming buffer, if one is
	// present. This field will be nil if the table is not being streamed to or if
	// there is no data in the streaming buffer.
	StreamingBuffer *StreamingBuffer

	// ETag is the ETag obtained when reading metadata. Pass it to Table.Update to
	// ensure that the metadata hasn't changed since it was read.
	ETag string
}

// TableCreateDisposition specifies the circumstances under which destination table will be created.
// Default is CreateIfNeeded.
type TableCreateDisposition string

const (
	// CreateIfNeeded will create the table if it does not already exist.
	// Tables are created atomically on successful completion of a job.
	CreateIfNeeded TableCreateDisposition = "CREATE_IF_NEEDED"

	// CreateNever ensures the table must already exist and will not be
	// automatically created.
	CreateNever TableCreateDisposition = "CREATE_NEVER"
)

// TableWriteDisposition specifies how existing data in a destination table is treated.
// Default is WriteAppend.
type TableWriteDisposition string

const (
	// WriteAppend will append to any existing data in the destination table.
	// Data is appended atomically on successful completion of a job.
	WriteAppend TableWriteDisposition = "WRITE_APPEND"

	// WriteTruncate overrides the existing data in the destination table.
	// Data is overwritten atomically on successful completion of a job.
	WriteTruncate TableWriteDisposition = "WRITE_TRUNCATE"

	// WriteEmpty fails writes if the destination table already contains data.
	WriteEmpty TableWriteDisposition = "WRITE_EMPTY"
)

// TableType is the type of table.
type TableType string

const (
	RegularTable  TableType = "TABLE"
	ViewTable     TableType = "VIEW"
	ExternalTable TableType = "EXTERNAL"
)

// TimePartitioning describes the time-based date partitioning on a table.
// For more information see: https://cloud.google.com/bigquery/docs/creating-partitioned-tables.
type TimePartitioning struct {
	// The amount of time to keep the storage for a partition.
	// If the duration is empty (0), the data in the partitions do not expire.
	Expiration time.Duration
}

func (p *TimePartitioning) toBQ() *bq.TimePartitioning {
	if p == nil {
		return nil
	}
	return &bq.TimePartitioning{
		Type:         "DAY",
		ExpirationMs: int64(p.Expiration / time.Millisecond),
	}
}

func bqToTimePartitioning(q *bq.TimePartitioning) *TimePartitioning {
	if q == nil {
		return nil
	}
	return &TimePartitioning{
		Expiration: time.Duration(q.ExpirationMs) * time.Millisecond,
	}
}

// StreamingBuffer holds information about the streaming buffer.
type StreamingBuffer struct {
	// A lower-bound estimate of the number of bytes currently in the streaming
	// buffer.
	EstimatedBytes uint64

	// A lower-bound estimate of the number of rows currently in the streaming
	// buffer.
	EstimatedRows uint64

	// The time of the oldest entry in the streaming buffer.
	OldestEntryTime time.Time
}

func (t *Table) toBQ() *bq.TableReference {
	return &bq.TableReference{
		ProjectId: t.ProjectID,
		DatasetId: t.DatasetID,
		TableId:   t.TableID,
	}
}

// FullyQualifiedName returns the ID of the table in projectID:datasetID.tableID format.
func (t *Table) FullyQualifiedName() string {
	return fmt.Sprintf("%s:%s.%s", t.ProjectID, t.DatasetID, t.TableID)
}

// implicitTable reports whether Table is an empty placeholder, which signifies that a new table should be created with an auto-generated Table ID.
func (t *Table) implicitTable() bool {
	return t.ProjectID == "" && t.DatasetID == "" && t.TableID == ""
}

// Create creates a table in the BigQuery service.
// Pass in a TableMetadata value to configure the table.
// If tm.View.Query is non-empty, the created table will be of type VIEW.
// Expiration can only be set during table creation.
// After table creation, a view can be modified only if its table was initially created
// with a view.
func (t *Table) Create(ctx context.Context, tm *TableMetadata) error {
	table, err := tm.toBQ()
	if err != nil {
		return err
	}
	table.TableReference = &bq.TableReference{
		ProjectId: t.ProjectID,
		DatasetId: t.DatasetID,
		TableId:   t.TableID,
	}
	req := t.c.bqs.Tables.Insert(t.ProjectID, t.DatasetID, table).Context(ctx)
	setClientHeader(req.Header())
	_, err = req.Do()
	return err
}

func (tm *TableMetadata) toBQ() (*bq.Table, error) {
	t := &bq.Table{}
	if tm == nil {
		return t, nil
	}
	if tm.Schema != nil && tm.ViewQuery != "" {
		return nil, errors.New("bigquery: provide Schema or ViewQuery, not both")
	}
	t.FriendlyName = tm.Name
	t.Description = tm.Description
	t.Labels = tm.Labels
	if tm.Schema != nil {
		t.Schema = tm.Schema.toBQ()
	}
	if tm.ViewQuery != "" {
		if tm.UseStandardSQL && tm.UseLegacySQL {
			return nil, errors.New("bigquery: cannot provide both UseStandardSQL and UseLegacySQL")
		}
		t.View = &bq.ViewDefinition{Query: tm.ViewQuery}
		if tm.UseLegacySQL {
			t.View.UseLegacySql = true
		} else {
			t.View.UseLegacySql = false
			t.View.ForceSendFields = append(t.View.ForceSendFields, "UseLegacySql")
		}
	} else if tm.UseLegacySQL || tm.UseStandardSQL {
		return nil, errors.New("bigquery: UseLegacy/StandardSQL requires ViewQuery")
	}
	t.TimePartitioning = tm.TimePartitioning.toBQ()
	if !tm.ExpirationTime.IsZero() {
		t.ExpirationTime = tm.ExpirationTime.UnixNano() / 1e6
	}
	if tm.ExternalDataConfig != nil {
		edc := tm.ExternalDataConfig.toBQ()
		t.ExternalDataConfiguration = &edc
	}
	if tm.FullID != "" {
		return nil, errors.New("cannot set FullID on create")
	}
	if tm.Type != "" {
		return nil, errors.New("cannot set Type on create")
	}
	if !tm.CreationTime.IsZero() {
		return nil, errors.New("cannot set CreationTime on create")
	}
	if !tm.LastModifiedTime.IsZero() {
		return nil, errors.New("cannot set LastModifiedTime on create")
	}
	if tm.NumBytes != 0 {
		return nil, errors.New("cannot set NumBytes on create")
	}
	if tm.NumRows != 0 {
		return nil, errors.New("cannot set NumRows on create")
	}
	if tm.StreamingBuffer != nil {
		return nil, errors.New("cannot set StreamingBuffer on create")
	}
	if tm.ETag != "" {
		return nil, errors.New("cannot set ETag on create")
	}
	return t, nil
}

// Metadata fetches the metadata for the table.
func (t *Table) Metadata(ctx context.Context) (*TableMetadata, error) {
	req := t.c.bqs.Tables.Get(t.ProjectID, t.DatasetID, t.TableID).Context(ctx)
	setClientHeader(req.Header())
	var table *bq.Table
	err := runWithRetry(ctx, func() (err error) {
		table, err = req.Do()
		return err
	})
	if err != nil {
		return nil, err
	}
	return bqToTableMetadata(table)
}

func bqToTableMetadata(t *bq.Table) (*TableMetadata, error) {
	md := &TableMetadata{
		Description:      t.Description,
		Name:             t.FriendlyName,
		Type:             TableType(t.Type),
		FullID:           t.Id,
		Labels:           t.Labels,
		NumBytes:         t.NumBytes,
		NumRows:          t.NumRows,
		ExpirationTime:   unixMillisToTime(t.ExpirationTime),
		CreationTime:     unixMillisToTime(t.CreationTime),
		LastModifiedTime: unixMillisToTime(int64(t.LastModifiedTime)),
		ETag:             t.Etag,
	}
	if t.Schema != nil {
		md.Schema = bqToSchema(t.Schema)
	}
	if t.View != nil {
		md.ViewQuery = t.View.Query
		md.UseLegacySQL = t.View.UseLegacySql
	}
	md.TimePartitioning = bqToTimePartitioning(t.TimePartitioning)
	if t.StreamingBuffer != nil {
		md.StreamingBuffer = &StreamingBuffer{
			EstimatedBytes:  t.StreamingBuffer.EstimatedBytes,
			EstimatedRows:   t.StreamingBuffer.EstimatedRows,
			OldestEntryTime: unixMillisToTime(int64(t.StreamingBuffer.OldestEntryTime)),
		}
	}
	if t.ExternalDataConfiguration != nil {
		edc, err := bqToExternalDataConfig(t.ExternalDataConfiguration)
		if err != nil {
			return nil, err
		}
		md.ExternalDataConfig = edc
	}
	return md, nil
}

// Delete deletes the table.
func (t *Table) Delete(ctx context.Context) error {
	req := t.c.bqs.Tables.Delete(t.ProjectID, t.DatasetID, t.TableID).Context(ctx)
	setClientHeader(req.Header())
	return req.Do()
}

// Read fetches the contents of the table.
func (t *Table) Read(ctx context.Context) *RowIterator {
	return t.read(ctx, fetchPage)
}

func (t *Table) read(ctx context.Context, pf pageFetcher) *RowIterator {
	return newRowIterator(ctx, t, pf)
}

// Update modifies specific Table metadata fields.
func (t *Table) Update(ctx context.Context, tm TableMetadataToUpdate, etag string) (*TableMetadata, error) {
	bqt := tm.toBQ()
	call := t.c.bqs.Tables.Patch(t.ProjectID, t.DatasetID, t.TableID, bqt).Context(ctx)
	setClientHeader(call.Header())
	if etag != "" {
		call.Header().Set("If-Match", etag)
	}
	var res *bq.Table
	if err := runWithRetry(ctx, func() (err error) {
		res, err = call.Do()
		return err
	}); err != nil {
		return nil, err
	}
	return bqToTableMetadata(res)
}

func (tm *TableMetadataToUpdate) toBQ() *bq.Table {
	t := &bq.Table{}
	forceSend := func(field string) {
		t.ForceSendFields = append(t.ForceSendFields, field)
	}

	if tm.Description != nil {
		t.Description = optional.ToString(tm.Description)
		forceSend("Description")
	}
	if tm.Name != nil {
		t.FriendlyName = optional.ToString(tm.Name)
		forceSend("FriendlyName")
	}
	if tm.Schema != nil {
		t.Schema = tm.Schema.toBQ()
		forceSend("Schema")
	}
	if !tm.ExpirationTime.IsZero() {
		t.ExpirationTime = tm.ExpirationTime.UnixNano() / 1e6
		forceSend("ExpirationTime")
	}
	if tm.ViewQuery != nil {
		t.View = &bq.ViewDefinition{
			Query:           optional.ToString(tm.ViewQuery),
			ForceSendFields: []string{"Query"},
		}
	}
	if tm.UseLegacySQL != nil {
		if t.View == nil {
			t.View = &bq.ViewDefinition{}
		}
		t.View.UseLegacySql = optional.ToBool(tm.UseLegacySQL)
		t.View.ForceSendFields = append(t.View.ForceSendFields, "UseLegacySql")
	}
	labels, forces, nulls := tm.update()
	t.Labels = labels
	t.ForceSendFields = append(t.ForceSendFields, forces...)
	t.NullFields = append(t.NullFields, nulls...)
	return t
}

// TableMetadataToUpdate is used when updating a table's metadata.
// Only non-nil fields will be updated.
type TableMetadataToUpdate struct {
	// The user-friendly description of this table.
	Description optional.String

	// The user-friendly name for this table.
	Name optional.String

	// The table's schema.
	// When updating a schema, you can add columns but not remove them.
	Schema Schema

	// The time when this table expires.
	ExpirationTime time.Time

	// The query to use for a view.
	ViewQuery optional.String

	// Use Legacy SQL for the view query.
	UseLegacySQL optional.Bool

	labelUpdater
}

// labelUpdater contains common code for updating labels.
type labelUpdater struct {
	setLabels    map[string]string
	deleteLabels map[string]bool
}

// SetLabel causes a label to be added or modified on a call to Update.
func (u *labelUpdater) SetLabel(name, value string) {
	if u.setLabels == nil {
		u.setLabels = map[string]string{}
	}
	u.setLabels[name] = value
}

// DeleteLabel causes a label to be deleted on a call to Update.
func (u *labelUpdater) DeleteLabel(name string) {
	if u.deleteLabels == nil {
		u.deleteLabels = map[string]bool{}
	}
	u.deleteLabels[name] = true
}

func (u *labelUpdater) update() (labels map[string]string, forces, nulls []string) {
	if u.setLabels == nil && u.deleteLabels == nil {
		return nil, nil, nil
	}
	labels = map[string]string{}
	for k, v := range u.setLabels {
		labels[k] = v
	}
	if len(labels) == 0 && len(u.deleteLabels) > 0 {
		forces = []string{"Labels"}
	}
	for l := range u.deleteLabels {
		nulls = append(nulls, "Labels."+l)
	}
	return labels, forces, nulls
}