forked from TrueCloudLab/restic
241 lines
6.2 KiB
Go
241 lines
6.2 KiB
Go
// Copyright 2016 Google Inc. All Rights Reserved.
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
// Package bytestream provides a client for any service that exposes a ByteStream API.
|
|
//
|
|
// Note: This package is a work-in-progress. Backwards-incompatible changes should be expected.
|
|
package bytestream
|
|
|
|
// This file contains the client implementation of Bytestream declared at:
|
|
// https://github.com/googleapis/googleapis/blob/master/google/bytestream/bytestream.proto
|
|
|
|
import (
|
|
"fmt"
|
|
"math/rand"
|
|
"time"
|
|
|
|
"golang.org/x/net/context"
|
|
"google.golang.org/grpc"
|
|
|
|
pb "google.golang.org/genproto/googleapis/bytestream"
|
|
)
|
|
|
|
const (
|
|
// MaxBufSize is the maximum buffer size (in bytes) received in a read chunk or sent in a write chunk.
|
|
MaxBufSize = 2 * 1024 * 1024
|
|
backoffBase = 10 * time.Millisecond
|
|
backoffMax = 1 * time.Second
|
|
maxTries = 5
|
|
)
|
|
|
|
// Client is the go wrapper around a ByteStreamClient and provides an interface to it.
|
|
type Client struct {
|
|
client pb.ByteStreamClient
|
|
options []grpc.CallOption
|
|
}
|
|
|
|
// NewClient creates a new bytestream.Client.
|
|
func NewClient(cc *grpc.ClientConn, options ...grpc.CallOption) *Client {
|
|
return &Client{
|
|
client: pb.NewByteStreamClient(cc),
|
|
options: options,
|
|
}
|
|
}
|
|
|
|
// Reader reads from a byte stream.
|
|
type Reader struct {
|
|
ctx context.Context
|
|
c *Client
|
|
readClient pb.ByteStream_ReadClient
|
|
resourceName string
|
|
err error
|
|
buf []byte
|
|
}
|
|
|
|
// ResourceName gets the resource name this Reader is reading.
|
|
func (r *Reader) ResourceName() string {
|
|
return r.resourceName
|
|
}
|
|
|
|
// Read implements io.Reader.
|
|
// Read buffers received bytes that do not fit in p.
|
|
func (r *Reader) Read(p []byte) (int, error) {
|
|
if r.err != nil {
|
|
return 0, r.err
|
|
}
|
|
|
|
var backoffDelay time.Duration
|
|
for tries := 0; len(r.buf) == 0 && tries < maxTries; tries++ {
|
|
// No data in buffer.
|
|
resp, err := r.readClient.Recv()
|
|
if err != nil {
|
|
r.err = err
|
|
return 0, err
|
|
}
|
|
r.buf = resp.Data
|
|
if len(r.buf) != 0 {
|
|
break
|
|
}
|
|
|
|
// back off
|
|
if backoffDelay < backoffBase {
|
|
backoffDelay = backoffBase
|
|
} else {
|
|
backoffDelay = time.Duration(float64(backoffDelay) * 1.3 * (1 - 0.4*rand.Float64()))
|
|
}
|
|
if backoffDelay > backoffMax {
|
|
backoffDelay = backoffMax
|
|
}
|
|
select {
|
|
case <-time.After(backoffDelay):
|
|
case <-r.ctx.Done():
|
|
if err := r.ctx.Err(); err != nil {
|
|
r.err = err
|
|
}
|
|
return 0, r.err
|
|
}
|
|
}
|
|
|
|
// Copy from buffer.
|
|
n := copy(p, r.buf)
|
|
r.buf = r.buf[n:]
|
|
return n, nil
|
|
}
|
|
|
|
// Close implements io.Closer.
|
|
func (r *Reader) Close() error {
|
|
if r.readClient == nil {
|
|
return nil
|
|
}
|
|
err := r.readClient.CloseSend()
|
|
r.readClient = nil
|
|
return err
|
|
}
|
|
|
|
// NewReader creates a new Reader to read a resource.
|
|
func (c *Client) NewReader(ctx context.Context, resourceName string) (*Reader, error) {
|
|
return c.NewReaderAt(ctx, resourceName, 0)
|
|
}
|
|
|
|
// NewReader creates a new Reader to read a resource from the given offset.
|
|
func (c *Client) NewReaderAt(ctx context.Context, resourceName string, offset int64) (*Reader, error) {
|
|
// readClient is set up for Read(). ReadAt() will copy needed fields into its reentrantReader.
|
|
readClient, err := c.client.Read(ctx, &pb.ReadRequest{
|
|
ResourceName: resourceName,
|
|
ReadOffset: offset,
|
|
}, c.options...)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return &Reader{
|
|
ctx: ctx,
|
|
c: c,
|
|
resourceName: resourceName,
|
|
readClient: readClient,
|
|
}, nil
|
|
}
|
|
|
|
// Writer writes to a byte stream.
|
|
type Writer struct {
|
|
ctx context.Context
|
|
writeClient pb.ByteStream_WriteClient
|
|
resourceName string
|
|
offset int64
|
|
backoffDelay time.Duration
|
|
err error
|
|
}
|
|
|
|
// ResourceName gets the resource name this Writer is writing.
|
|
func (w *Writer) ResourceName() string {
|
|
return w.resourceName
|
|
}
|
|
|
|
// Write implements io.Writer.
|
|
func (w *Writer) Write(p []byte) (int, error) {
|
|
if w.err != nil {
|
|
return 0, w.err
|
|
}
|
|
|
|
n := 0
|
|
for n < len(p) {
|
|
bufSize := len(p) - n
|
|
if bufSize > MaxBufSize {
|
|
bufSize = MaxBufSize
|
|
}
|
|
r := pb.WriteRequest{
|
|
WriteOffset: w.offset,
|
|
FinishWrite: false,
|
|
Data: p[n : n+bufSize],
|
|
}
|
|
// Bytestream only requires the resourceName to be sent in the first WriteRequest.
|
|
if w.offset == 0 {
|
|
r.ResourceName = w.resourceName
|
|
}
|
|
err := w.writeClient.Send(&r)
|
|
if err != nil {
|
|
w.err = err
|
|
return n, err
|
|
}
|
|
w.offset += int64(bufSize)
|
|
n += bufSize
|
|
}
|
|
return n, nil
|
|
}
|
|
|
|
// Close implements io.Closer. It is the caller's responsibility to call Close() when writing is done.
|
|
func (w *Writer) Close() error {
|
|
err := w.writeClient.Send(&pb.WriteRequest{
|
|
ResourceName: w.resourceName,
|
|
WriteOffset: w.offset,
|
|
FinishWrite: true,
|
|
Data: nil,
|
|
})
|
|
if err != nil {
|
|
w.err = err
|
|
return fmt.Errorf("Send(WriteRequest< FinishWrite >) failed: %v", err)
|
|
}
|
|
resp, err := w.writeClient.CloseAndRecv()
|
|
if err != nil {
|
|
w.err = err
|
|
return fmt.Errorf("CloseAndRecv: %v", err)
|
|
}
|
|
if resp == nil {
|
|
err = fmt.Errorf("expected a response on close, got %v", resp)
|
|
} else if resp.CommittedSize != w.offset {
|
|
err = fmt.Errorf("server only wrote %d bytes, want %d", resp.CommittedSize, w.offset)
|
|
}
|
|
w.err = err
|
|
return err
|
|
}
|
|
|
|
// NewWriter creates a new Writer to write a resource.
|
|
//
|
|
// resourceName specifies the name of the resource.
|
|
// The resource will be available after Close has been called.
|
|
//
|
|
// It is the caller's responsibility to call Close when writing is done.
|
|
//
|
|
// TODO: There is currently no way to resume a write. Maybe NewWriter should begin with a call to QueryWriteStatus.
|
|
func (c *Client) NewWriter(ctx context.Context, resourceName string) (*Writer, error) {
|
|
wc, err := c.client.Write(ctx, c.options...)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return &Writer{
|
|
ctx: ctx,
|
|
writeClient: wc,
|
|
resourceName: resourceName,
|
|
}, nil
|
|
}
|