/*
 * MinIO Cloud Storage, (C) 2019 MinIO, Inc.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package json

import (
	"bufio"
	"bytes"
	"io"
	"runtime"
	"sync"

	"github.com/bcicen/jstream"
	"github.com/minio/minio/pkg/s3select/sql"
)

// PReader - JSON record reader for S3Select.
// Operates concurrently on line-delimited JSON.
type PReader struct {
	args        *ReaderArgs
	readCloser  io.ReadCloser   // raw input
	buf         *bufio.Reader   // input to the splitter
	current     []jstream.KVS   // current block of results to be returned
	recordsRead int             // number of records read in current slice
	input       chan *queueItem // input for workers
	queue       chan *queueItem // output from workers in order
	err         error           // global error state, only touched by Reader.Read
	bufferPool  sync.Pool       // pool of []byte objects for input
	kvDstPool   sync.Pool       // pool of []jstream.KV used for output
	close       chan struct{}   // used for shutting down the splitter before end of stream
	readerWg    sync.WaitGroup  // used to keep track of async reader.
}

// queueItem is an item in the queue.
type queueItem struct {
	input []byte             // raw input sent to the worker
	dst   chan []jstream.KVS // result of block decode
	err   error              // any error encountered will be set here
}

// Read - reads single record.
// Once Read is called the previous record should no longer be referenced.
func (r *PReader) Read(dst sql.Record) (sql.Record, error) {
	// If we have have any records left, return these before any error.
	for len(r.current) <= r.recordsRead {
		if r.err != nil {
			return nil, r.err
		}
		// Move to next block
		item, ok := <-r.queue
		if !ok {
			r.err = io.EOF
			return nil, r.err
		}
		//lint:ignore SA6002 Using pointer would allocate more since we would have to copy slice header before taking a pointer.
		r.kvDstPool.Put(r.current)
		r.current = <-item.dst
		r.err = item.err
		r.recordsRead = 0
	}
	kvRecord := r.current[r.recordsRead]
	r.recordsRead++

	dstRec, ok := dst.(*Record)
	if !ok {
		dstRec = &Record{}
	}
	dstRec.KVS = kvRecord
	dstRec.SelectFormat = sql.SelectFmtJSON
	return dstRec, nil
}

// Close - closes underlying reader.
func (r *PReader) Close() error {
	if r.close != nil {
		close(r.close)
		r.readerWg.Wait()
		r.close = nil
	}
	r.recordsRead = len(r.current)
	if r.err == nil {
		r.err = io.EOF
	}
	return r.readCloser.Close()
}

// nextSplit will attempt to skip a number of bytes and
// return the buffer until the next newline occurs.
// The last block will be sent along with an io.EOF.
func (r *PReader) nextSplit(skip int, dst []byte) ([]byte, error) {
	if cap(dst) < skip {
		dst = make([]byte, 0, skip+1024)
	}
	dst = dst[:skip]
	if skip > 0 {
		n, err := io.ReadFull(r.buf, dst)
		if err != nil && err != io.ErrUnexpectedEOF {
			// If an EOF happens after reading some but not all the bytes,
			// ReadFull returns ErrUnexpectedEOF.
			return dst[:n], err
		}
		dst = dst[:n]
		if err == io.ErrUnexpectedEOF {
			return dst, io.EOF
		}
	}
	// Read until next line.
	in, err := r.buf.ReadBytes('\n')
	dst = append(dst, in...)
	return dst, err
}

// jsonSplitSize is the size of each block.
// Blocks will read this much and find the first following newline.
// 128KB appears to be a very reasonable default.
const jsonSplitSize = 128 << 10

// startReaders will read the header if needed and spin up a parser
// and a number of workers based on GOMAXPROCS.
// If an error is returned no goroutines have been started and r.err will have been set.
func (r *PReader) startReaders() {
	r.bufferPool.New = func() interface{} {
		return make([]byte, jsonSplitSize+1024)
	}

	// Create queue
	r.queue = make(chan *queueItem, runtime.GOMAXPROCS(0))
	r.input = make(chan *queueItem, runtime.GOMAXPROCS(0))
	r.readerWg.Add(1)

	// Start splitter
	go func() {
		defer close(r.input)
		defer close(r.queue)
		defer r.readerWg.Done()
		for {
			next, err := r.nextSplit(jsonSplitSize, r.bufferPool.Get().([]byte))
			q := queueItem{
				input: next,
				dst:   make(chan []jstream.KVS, 1),
				err:   err,
			}
			select {
			case <-r.close:
				return
			case r.queue <- &q:
			}

			select {
			case <-r.close:
				return
			case r.input <- &q:
			}
			if err != nil {
				// Exit on any error.
				return
			}
		}
	}()

	// Start parsers
	for i := 0; i < runtime.GOMAXPROCS(0); i++ {
		go func() {
			for in := range r.input {
				if len(in.input) == 0 {
					in.dst <- nil
					continue
				}
				dst, ok := r.kvDstPool.Get().([]jstream.KVS)
				if !ok {
					dst = make([]jstream.KVS, 0, 1000)
				}

				d := jstream.NewDecoder(bytes.NewBuffer(in.input), 0).ObjectAsKVS()
				stream := d.Stream()
				all := dst[:0]
				for mv := range stream {
					var kvs jstream.KVS
					if mv.ValueType == jstream.Object {
						// This is a JSON object type (that preserves key
						// order)
						kvs = mv.Value.(jstream.KVS)
					} else {
						// To be AWS S3 compatible Select for JSON needs to
						// output non-object JSON as single column value
						// i.e. a map with `_1` as key and value as the
						// non-object.
						kvs = jstream.KVS{jstream.KV{Key: "_1", Value: mv.Value}}
					}
					all = append(all, kvs)
				}
				// We don't need the input any more.
				//lint:ignore SA6002 Using pointer would allocate more since we would have to copy slice header before taking a pointer.
				r.bufferPool.Put(in.input)
				in.input = nil
				in.err = d.Err()
				in.dst <- all
			}
		}()
	}
}

// NewPReader - creates new parallel JSON reader using readCloser.
// Should only be used for LINES types.
func NewPReader(readCloser io.ReadCloser, args *ReaderArgs) *PReader {
	r := &PReader{
		args:       args,
		buf:        bufio.NewReaderSize(readCloser, jsonSplitSize*2),
		readCloser: readCloser,
		close:      make(chan struct{}),
	}
	r.startReaders()
	return r
}