215 lines
6.6 KiB
Go
215 lines
6.6 KiB
Go
// Copyright 2015 Google Inc. All Rights Reserved.
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package bigquery
|
|
|
|
import (
|
|
"fmt"
|
|
"reflect"
|
|
|
|
"golang.org/x/net/context"
|
|
bq "google.golang.org/api/bigquery/v2"
|
|
"google.golang.org/api/iterator"
|
|
)
|
|
|
|
func newRowIterator(ctx context.Context, t *Table, pf pageFetcher) *RowIterator {
|
|
it := &RowIterator{
|
|
ctx: ctx,
|
|
table: t,
|
|
pf: pf,
|
|
}
|
|
it.pageInfo, it.nextFunc = iterator.NewPageInfo(
|
|
it.fetch,
|
|
func() int { return len(it.rows) },
|
|
func() interface{} { r := it.rows; it.rows = nil; return r })
|
|
return it
|
|
}
|
|
|
|
// A RowIterator provides access to the result of a BigQuery lookup.
|
|
type RowIterator struct {
|
|
ctx context.Context
|
|
table *Table
|
|
pf pageFetcher
|
|
pageInfo *iterator.PageInfo
|
|
nextFunc func() error
|
|
|
|
// StartIndex can be set before the first call to Next. If PageInfo().Token
|
|
// is also set, StartIndex is ignored.
|
|
StartIndex uint64
|
|
|
|
// The schema of the table. Available after the first call to Next.
|
|
Schema Schema
|
|
|
|
// The total number of rows in the result. Available after the first call to Next.
|
|
// May be zero just after rows were inserted.
|
|
TotalRows uint64
|
|
|
|
rows [][]Value
|
|
structLoader structLoader // used to populate a pointer to a struct
|
|
}
|
|
|
|
// Next loads the next row into dst. Its return value is iterator.Done if there
|
|
// are no more results. Once Next returns iterator.Done, all subsequent calls
|
|
// will return iterator.Done.
|
|
//
|
|
// dst may implement ValueLoader, or may be a *[]Value, *map[string]Value, or struct pointer.
|
|
//
|
|
// If dst is a *[]Value, it will be set to to new []Value whose i'th element
|
|
// will be populated with the i'th column of the row.
|
|
//
|
|
// If dst is a *map[string]Value, a new map will be created if dst is nil. Then
|
|
// for each schema column name, the map key of that name will be set to the column's
|
|
// value. STRUCT types (RECORD types or nested schemas) become nested maps.
|
|
//
|
|
// If dst is pointer to a struct, each column in the schema will be matched
|
|
// with an exported field of the struct that has the same name, ignoring case.
|
|
// Unmatched schema columns and struct fields will be ignored.
|
|
//
|
|
// Each BigQuery column type corresponds to one or more Go types; a matching struct
|
|
// field must be of the correct type. The correspondences are:
|
|
//
|
|
// STRING string
|
|
// BOOL bool
|
|
// INTEGER int, int8, int16, int32, int64, uint8, uint16, uint32
|
|
// FLOAT float32, float64
|
|
// BYTES []byte
|
|
// TIMESTAMP time.Time
|
|
// DATE civil.Date
|
|
// TIME civil.Time
|
|
// DATETIME civil.DateTime
|
|
//
|
|
// A repeated field corresponds to a slice or array of the element type. A STRUCT
|
|
// type (RECORD or nested schema) corresponds to a nested struct or struct pointer.
|
|
// All calls to Next on the same iterator must use the same struct type.
|
|
//
|
|
// It is an error to attempt to read a BigQuery NULL value into a struct field,
|
|
// unless the field is of type []byte or is one of the special Null types: NullInt64,
|
|
// NullFloat64, NullBool, NullString, NullTimestamp, NullDate, NullTime or
|
|
// NullDateTime. You can also use a *[]Value or *map[string]Value to read from a
|
|
// table with NULLs.
|
|
func (it *RowIterator) Next(dst interface{}) error {
|
|
var vl ValueLoader
|
|
switch dst := dst.(type) {
|
|
case ValueLoader:
|
|
vl = dst
|
|
case *[]Value:
|
|
vl = (*valueList)(dst)
|
|
case *map[string]Value:
|
|
vl = (*valueMap)(dst)
|
|
default:
|
|
if !isStructPtr(dst) {
|
|
return fmt.Errorf("bigquery: cannot convert %T to ValueLoader (need pointer to []Value, map[string]Value, or struct)", dst)
|
|
}
|
|
}
|
|
if err := it.nextFunc(); err != nil {
|
|
return err
|
|
}
|
|
row := it.rows[0]
|
|
it.rows = it.rows[1:]
|
|
|
|
if vl == nil {
|
|
// This can only happen if dst is a pointer to a struct. We couldn't
|
|
// set vl above because we need the schema.
|
|
if err := it.structLoader.set(dst, it.Schema); err != nil {
|
|
return err
|
|
}
|
|
vl = &it.structLoader
|
|
}
|
|
return vl.Load(row, it.Schema)
|
|
}
|
|
|
|
func isStructPtr(x interface{}) bool {
|
|
t := reflect.TypeOf(x)
|
|
return t.Kind() == reflect.Ptr && t.Elem().Kind() == reflect.Struct
|
|
}
|
|
|
|
// PageInfo supports pagination. See the google.golang.org/api/iterator package for details.
|
|
func (it *RowIterator) PageInfo() *iterator.PageInfo { return it.pageInfo }
|
|
|
|
func (it *RowIterator) fetch(pageSize int, pageToken string) (string, error) {
|
|
res, err := it.pf(it.ctx, it.table, it.Schema, it.StartIndex, int64(pageSize), pageToken)
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
it.rows = append(it.rows, res.rows...)
|
|
it.Schema = res.schema
|
|
it.TotalRows = res.totalRows
|
|
return res.pageToken, nil
|
|
}
|
|
|
|
// A pageFetcher returns a page of rows from a destination table.
|
|
type pageFetcher func(ctx context.Context, _ *Table, _ Schema, startIndex uint64, pageSize int64, pageToken string) (*fetchPageResult, error)
|
|
|
|
type fetchPageResult struct {
|
|
pageToken string
|
|
rows [][]Value
|
|
totalRows uint64
|
|
schema Schema
|
|
}
|
|
|
|
// fetchPage gets a page of rows from t.
|
|
func fetchPage(ctx context.Context, t *Table, schema Schema, startIndex uint64, pageSize int64, pageToken string) (*fetchPageResult, error) {
|
|
// Fetch the table schema in the background, if necessary.
|
|
errc := make(chan error, 1)
|
|
if schema != nil {
|
|
errc <- nil
|
|
} else {
|
|
go func() {
|
|
var bqt *bq.Table
|
|
err := runWithRetry(ctx, func() (err error) {
|
|
bqt, err = t.c.bqs.Tables.Get(t.ProjectID, t.DatasetID, t.TableID).
|
|
Fields("schema").
|
|
Context(ctx).
|
|
Do()
|
|
return err
|
|
})
|
|
if err == nil && bqt.Schema != nil {
|
|
schema = bqToSchema(bqt.Schema)
|
|
}
|
|
errc <- err
|
|
}()
|
|
}
|
|
call := t.c.bqs.Tabledata.List(t.ProjectID, t.DatasetID, t.TableID)
|
|
setClientHeader(call.Header())
|
|
if pageToken != "" {
|
|
call.PageToken(pageToken)
|
|
} else {
|
|
call.StartIndex(startIndex)
|
|
}
|
|
if pageSize > 0 {
|
|
call.MaxResults(pageSize)
|
|
}
|
|
var res *bq.TableDataList
|
|
err := runWithRetry(ctx, func() (err error) {
|
|
res, err = call.Context(ctx).Do()
|
|
return err
|
|
})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
err = <-errc
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
rows, err := convertRows(res.Rows, schema)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return &fetchPageResult{
|
|
pageToken: res.PageToken,
|
|
rows: rows,
|
|
totalRows: uint64(res.TotalRows),
|
|
schema: schema,
|
|
}, nil
|
|
}
|