frostfs-s3-gw/pkg/s3select/json/preader.go

227 lines
6.3 KiB
Go

/*
* MinIO Cloud Storage, (C) 2019 MinIO, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package json
import (
"bufio"
"bytes"
"io"
"runtime"
"sync"
"github.com/bcicen/jstream"
"github.com/minio/minio/pkg/s3select/sql"
)
// PReader - JSON record reader for S3Select.
// Operates concurrently on line-delimited JSON.
type PReader struct {
args *ReaderArgs
readCloser io.ReadCloser // raw input
buf *bufio.Reader // input to the splitter
current []jstream.KVS // current block of results to be returned
recordsRead int // number of records read in current slice
input chan *queueItem // input for workers
queue chan *queueItem // output from workers in order
err error // global error state, only touched by Reader.Read
bufferPool sync.Pool // pool of []byte objects for input
kvDstPool sync.Pool // pool of []jstream.KV used for output
close chan struct{} // used for shutting down the splitter before end of stream
readerWg sync.WaitGroup // used to keep track of async reader.
}
// queueItem is an item in the queue.
type queueItem struct {
input []byte // raw input sent to the worker
dst chan []jstream.KVS // result of block decode
err error // any error encountered will be set here
}
// Read - reads single record.
// Once Read is called the previous record should no longer be referenced.
func (r *PReader) Read(dst sql.Record) (sql.Record, error) {
// If we have have any records left, return these before any error.
for len(r.current) <= r.recordsRead {
if r.err != nil {
return nil, r.err
}
// Move to next block
item, ok := <-r.queue
if !ok {
r.err = io.EOF
return nil, r.err
}
//lint:ignore SA6002 Using pointer would allocate more since we would have to copy slice header before taking a pointer.
r.kvDstPool.Put(r.current)
r.current = <-item.dst
r.err = item.err
r.recordsRead = 0
}
kvRecord := r.current[r.recordsRead]
r.recordsRead++
dstRec, ok := dst.(*Record)
if !ok {
dstRec = &Record{}
}
dstRec.KVS = kvRecord
dstRec.SelectFormat = sql.SelectFmtJSON
return dstRec, nil
}
// Close - closes underlying reader.
func (r *PReader) Close() error {
if r.close != nil {
close(r.close)
r.readerWg.Wait()
r.close = nil
}
r.recordsRead = len(r.current)
if r.err == nil {
r.err = io.EOF
}
return r.readCloser.Close()
}
// nextSplit will attempt to skip a number of bytes and
// return the buffer until the next newline occurs.
// The last block will be sent along with an io.EOF.
func (r *PReader) nextSplit(skip int, dst []byte) ([]byte, error) {
if cap(dst) < skip {
dst = make([]byte, 0, skip+1024)
}
dst = dst[:skip]
if skip > 0 {
n, err := io.ReadFull(r.buf, dst)
if err != nil && err != io.ErrUnexpectedEOF {
// If an EOF happens after reading some but not all the bytes,
// ReadFull returns ErrUnexpectedEOF.
return dst[:n], err
}
dst = dst[:n]
if err == io.ErrUnexpectedEOF {
return dst, io.EOF
}
}
// Read until next line.
in, err := r.buf.ReadBytes('\n')
dst = append(dst, in...)
return dst, err
}
// jsonSplitSize is the size of each block.
// Blocks will read this much and find the first following newline.
// 128KB appears to be a very reasonable default.
const jsonSplitSize = 128 << 10
// startReaders will read the header if needed and spin up a parser
// and a number of workers based on GOMAXPROCS.
// If an error is returned no goroutines have been started and r.err will have been set.
func (r *PReader) startReaders() {
r.bufferPool.New = func() interface{} {
return make([]byte, jsonSplitSize+1024)
}
// Create queue
r.queue = make(chan *queueItem, runtime.GOMAXPROCS(0))
r.input = make(chan *queueItem, runtime.GOMAXPROCS(0))
r.readerWg.Add(1)
// Start splitter
go func() {
defer close(r.input)
defer close(r.queue)
defer r.readerWg.Done()
for {
next, err := r.nextSplit(jsonSplitSize, r.bufferPool.Get().([]byte))
q := queueItem{
input: next,
dst: make(chan []jstream.KVS, 1),
err: err,
}
select {
case <-r.close:
return
case r.queue <- &q:
}
select {
case <-r.close:
return
case r.input <- &q:
}
if err != nil {
// Exit on any error.
return
}
}
}()
// Start parsers
for i := 0; i < runtime.GOMAXPROCS(0); i++ {
go func() {
for in := range r.input {
if len(in.input) == 0 {
in.dst <- nil
continue
}
dst, ok := r.kvDstPool.Get().([]jstream.KVS)
if !ok {
dst = make([]jstream.KVS, 0, 1000)
}
d := jstream.NewDecoder(bytes.NewBuffer(in.input), 0).ObjectAsKVS()
stream := d.Stream()
all := dst[:0]
for mv := range stream {
var kvs jstream.KVS
if mv.ValueType == jstream.Object {
// This is a JSON object type (that preserves key
// order)
kvs = mv.Value.(jstream.KVS)
} else {
// To be AWS S3 compatible Select for JSON needs to
// output non-object JSON as single column value
// i.e. a map with `_1` as key and value as the
// non-object.
kvs = jstream.KVS{jstream.KV{Key: "_1", Value: mv.Value}}
}
all = append(all, kvs)
}
// We don't need the input any more.
//lint:ignore SA6002 Using pointer would allocate more since we would have to copy slice header before taking a pointer.
r.bufferPool.Put(in.input)
in.input = nil
in.err = d.Err()
in.dst <- all
}
}()
}
}
// NewPReader - creates new parallel JSON reader using readCloser.
// Should only be used for LINES types.
func NewPReader(readCloser io.ReadCloser, args *ReaderArgs) *PReader {
r := &PReader{
args: args,
buf: bufio.NewReaderSize(readCloser, jsonSplitSize*2),
readCloser: readCloser,
close: make(chan struct{}),
}
r.startReaders()
return r
}