vendor: Update AzureSDK version to latest one, fixes failing integration tests

This commit is contained in:
sandeepkru 2018-09-06 21:44:08 -07:00 committed by Nick Craig-Wood
parent 5b27702b61
commit 233507bfe0
30 changed files with 961 additions and 323 deletions

View file

@ -12,10 +12,12 @@ import (
"sync"
"time"
"errors"
"github.com/Azure/azure-pipeline-go/pipeline"
)
// CommonResponseHeaders returns the headers common to all blob REST API responses.
// CommonResponse returns the headers common to all blob REST API responses.
type CommonResponse interface {
// ETag returns the value for header ETag.
ETag() ETag
@ -42,6 +44,7 @@ type UploadToBlockBlobOptions struct {
BlockSize int64
// Progress is a function that is invoked periodically as bytes are sent to the BlockBlobURL.
// Note that the progress reporting is not always increasing; it can go down when retrying a request.
Progress pipeline.ProgressReceiver
// BlobHTTPHeaders indicates the HTTP headers to be associated with the blob.
@ -65,12 +68,26 @@ func UploadBufferToBlockBlob(ctx context.Context, b []byte,
if o.BlockSize < 0 || o.BlockSize > BlockBlobMaxUploadBlobBytes {
panic(fmt.Sprintf("BlockSize option must be > 0 and <= %d", BlockBlobMaxUploadBlobBytes))
}
if o.BlockSize == 0 {
o.BlockSize = BlockBlobMaxUploadBlobBytes // Default if unspecified
}
size := int64(len(b))
if size <= BlockBlobMaxUploadBlobBytes {
bufferSize := int64(len(b))
if o.BlockSize == 0 {
// If bufferSize > (BlockBlobMaxStageBlockBytes * BlockBlobMaxBlocks), then error
if bufferSize > BlockBlobMaxStageBlockBytes*BlockBlobMaxBlocks {
return nil, errors.New("Buffer is too large to upload to a block blob")
}
// If bufferSize <= BlockBlobMaxUploadBlobBytes, then Upload should be used with just 1 I/O request
if bufferSize <= BlockBlobMaxUploadBlobBytes {
o.BlockSize = BlockBlobMaxUploadBlobBytes // Default if unspecified
} else {
o.BlockSize = bufferSize / BlockBlobMaxBlocks // buffer / max blocks = block size to use all 50,000 blocks
if o.BlockSize < BlobDefaultDownloadBlockSize { // If the block size is smaller than 4MB, round up to 4MB
o.BlockSize = BlobDefaultDownloadBlockSize
}
// StageBlock will be called with blockSize blocks and a parallelism of (BufferSize / BlockSize).
}
}
if bufferSize <= BlockBlobMaxUploadBlobBytes {
// If the size can fit in 1 Upload call, do it this way
var body io.ReadSeeker = bytes.NewReader(b)
if o.Progress != nil {
@ -79,7 +96,7 @@ func UploadBufferToBlockBlob(ctx context.Context, b []byte,
return blockBlobURL.Upload(ctx, body, o.BlobHTTPHeaders, o.Metadata, o.AccessConditions)
}
var numBlocks = uint16(((size - 1) / o.BlockSize) + 1)
var numBlocks = uint16(((bufferSize - 1) / o.BlockSize) + 1)
if numBlocks > BlockBlobMaxBlocks {
panic(fmt.Sprintf("The buffer's size is too big or the BlockSize is too small; the number of blocks must be <= %d", BlockBlobMaxBlocks))
}
@ -90,7 +107,7 @@ func UploadBufferToBlockBlob(ctx context.Context, b []byte,
err := doBatchTransfer(ctx, batchTransferOptions{
operationName: "UploadBufferToBlockBlob",
transferSize: size,
transferSize: bufferSize,
chunkSize: o.BlockSize,
parallelism: o.Parallelism,
operation: func(offset int64, count int64) error {
@ -115,7 +132,7 @@ func UploadBufferToBlockBlob(ctx context.Context, b []byte,
// Block IDs are unique values to avoid issue if 2+ clients are uploading blocks
// at the same time causing PutBlockList to get a mix of blocks from all the clients.
blockIDList[blockNum] = base64.StdEncoding.EncodeToString(newUUID().bytes())
_, err := blockBlobURL.StageBlock(ctx, blockIDList[blockNum], body, o.AccessConditions.LeaseAccessConditions)
_, err := blockBlobURL.StageBlock(ctx, blockIDList[blockNum], body, o.AccessConditions.LeaseAccessConditions, nil)
return err
},
})
@ -147,10 +164,9 @@ func UploadFileToBlockBlob(ctx context.Context, file *os.File,
///////////////////////////////////////////////////////////////////////////////
const BlobDefaultDownloadBlockSize = int64(4 * 1024 * 1024) // 4MB
// DownloadFromAzureFileOptions identifies options used by the DownloadAzureFileToBuffer and DownloadAzureFileToFile functions.
// DownloadFromBlobOptions identifies options used by the DownloadBlobToBuffer and DownloadBlobToFile functions.
type DownloadFromBlobOptions struct {
// BlockSize specifies the block size to use for each parallel download; the default size is BlobDefaultDownloadBlockSize.
BlockSize int64
@ -168,10 +184,9 @@ type DownloadFromBlobOptions struct {
RetryReaderOptionsPerBlock RetryReaderOptions
}
// downloadAzureFileToBuffer downloads an Azure file to a buffer with parallel.
// downloadBlobToBuffer downloads an Azure blob to a buffer with parallel.
func downloadBlobToBuffer(ctx context.Context, blobURL BlobURL, offset int64, count int64,
ac BlobAccessConditions, b []byte, o DownloadFromBlobOptions,
initialDownloadResponse *DownloadResponse) error {
b []byte, o DownloadFromBlobOptions, initialDownloadResponse *DownloadResponse) error {
// Validate parameters, and set defaults.
if o.BlockSize < 0 {
panic("BlockSize option must be >= 0")
@ -193,7 +208,7 @@ func downloadBlobToBuffer(ctx context.Context, blobURL BlobURL, offset int64, co
count = initialDownloadResponse.ContentLength() - offset // if we have the length, use it
} else {
// If we don't have the length at all, get it
dr, err := blobURL.Download(ctx, 0, CountToEnd, ac, false)
dr, err := blobURL.Download(ctx, 0, CountToEnd, o.AccessConditions, false)
if err != nil {
return err
}
@ -211,11 +226,11 @@ func downloadBlobToBuffer(ctx context.Context, blobURL BlobURL, offset int64, co
err := doBatchTransfer(ctx, batchTransferOptions{
operationName: "downloadBlobToBuffer",
transferSize: count,
transferSize: count,
chunkSize: o.BlockSize,
parallelism: o.Parallelism,
operation: func(chunkStart int64, count int64) error {
dr, err := blobURL.Download(ctx, chunkStart+ offset, count, ac, false)
dr, err := blobURL.Download(ctx, chunkStart+offset, count, o.AccessConditions, false)
body := dr.Body(o.RetryReaderOptionsPerBlock)
if o.Progress != nil {
rangeProgress := int64(0)
@ -241,18 +256,18 @@ func downloadBlobToBuffer(ctx context.Context, blobURL BlobURL, offset int64, co
return nil
}
// DownloadAzureFileToBuffer downloads an Azure file to a buffer with parallel.
// DownloadBlobToBuffer downloads an Azure blob to a buffer with parallel.
// Offset and count are optional, pass 0 for both to download the entire blob.
func DownloadBlobToBuffer(ctx context.Context, blobURL BlobURL, offset int64, count int64,
ac BlobAccessConditions, b []byte, o DownloadFromBlobOptions) error {
return downloadBlobToBuffer(ctx, blobURL, offset, count, ac, b, o, nil)
b []byte, o DownloadFromBlobOptions) error {
return downloadBlobToBuffer(ctx, blobURL, offset, count, b, o, nil)
}
// DownloadBlobToFile downloads an Azure file to a local file.
// DownloadBlobToFile downloads an Azure blob to a local file.
// The file would be truncated if the size doesn't match.
// Offset and count are optional, pass 0 for both to download the entire blob.
func DownloadBlobToFile(ctx context.Context, blobURL BlobURL, offset int64, count int64,
ac BlobAccessConditions, file *os.File, o DownloadFromBlobOptions) error {
file *os.File, o DownloadFromBlobOptions) error {
// 1. Validate parameters.
if file == nil {
panic("file must not be nil")
@ -262,8 +277,8 @@ func DownloadBlobToFile(ctx context.Context, blobURL BlobURL, offset int64, coun
var size int64
if count == CountToEnd {
// Try to get Azure file's size
props, err := blobURL.GetProperties(ctx, ac)
// Try to get Azure blob's size
props, err := blobURL.GetProperties(ctx, o.AccessConditions)
if err != nil {
return err
}
@ -272,7 +287,7 @@ func DownloadBlobToFile(ctx context.Context, blobURL BlobURL, offset int64, coun
size = count
}
// 3. Compare and try to resize local file's size if it doesn't match Azure file's size.
// 3. Compare and try to resize local file's size if it doesn't match Azure blob's size.
stat, err := file.Stat()
if err != nil {
return err
@ -284,19 +299,18 @@ func DownloadBlobToFile(ctx context.Context, blobURL BlobURL, offset int64, coun
}
if size > 0 {
// 4. Set mmap and call DownloadAzureFileToBuffer.
// 4. Set mmap and call downloadBlobToBuffer.
m, err := newMMF(file, true, 0, int(size))
if err != nil {
return err
}
defer m.unmap()
return downloadBlobToBuffer(ctx, blobURL, offset, size, ac, m, o, nil)
return downloadBlobToBuffer(ctx, blobURL, offset, size, m, o, nil)
} else { // if the blob's size is 0, there is no need in downloading it
return nil
}
}
///////////////////////////////////////////////////////////////////////////////
// BatchTransferOptions identifies options used by doBatchTransfer.
@ -374,7 +388,10 @@ func UploadStreamToBlockBlob(ctx context.Context, reader io.Reader, blockBlobURL
result, err := uploadStream(ctx, reader,
UploadStreamOptions{BufferSize: o.BufferSize, MaxBuffers: o.MaxBuffers},
&uploadStreamToBlockBlobOptions{b: blockBlobURL, o: o, blockIDPrefix: newUUID()})
return result.(CommonResponse), err
if err != nil {
return nil, err
}
return result.(CommonResponse), nil
}
type uploadStreamToBlockBlobOptions struct {
@ -396,7 +413,7 @@ func (t *uploadStreamToBlockBlobOptions) chunk(ctx context.Context, num uint32,
return nil
}
// Else, upload a staged block...
AtomicMorphUint32(&t.maxBlockNum, func(startVal uint32) (val uint32, morphResult interface{}) {
atomicMorphUint32(&t.maxBlockNum, func(startVal uint32) (val uint32, morphResult interface{}) {
// Atomically remember (in t.numBlocks) the maximum block num we've ever seen
if startVal < num {
return num, nil
@ -404,7 +421,7 @@ func (t *uploadStreamToBlockBlobOptions) chunk(ctx context.Context, num uint32,
return startVal, nil
})
blockID := newUuidBlockID(t.blockIDPrefix).WithBlockNumber(num).ToBase64()
_, err := t.b.StageBlock(ctx, blockID, bytes.NewReader(buffer), LeaseAccessConditions{})
_, err := t.b.StageBlock(ctx, blockID, bytes.NewReader(buffer), LeaseAccessConditions{}, nil)
return err
}
@ -416,7 +433,7 @@ func (t *uploadStreamToBlockBlobOptions) end(ctx context.Context) (interface{},
}
// Multiple blocks staged, commit them all now
blockID := newUuidBlockID(t.blockIDPrefix)
blockIDs := make([]string, t.maxBlockNum + 1)
blockIDs := make([]string, t.maxBlockNum+1)
for bn := uint32(0); bn <= t.maxBlockNum; bn++ {
blockIDs[bn] = blockID.WithBlockNumber(bn).ToBase64()
}
@ -436,7 +453,28 @@ type UploadStreamOptions struct {
BufferSize int
}
type firstErr struct {
lock sync.Mutex
finalError error
}
func (fe *firstErr) set(err error) {
fe.lock.Lock()
if fe.finalError == nil {
fe.finalError = err
}
fe.lock.Unlock()
}
func (fe *firstErr) get() (err error) {
fe.lock.Lock()
err = fe.finalError
fe.lock.Unlock()
return
}
func uploadStream(ctx context.Context, reader io.Reader, o UploadStreamOptions, t iTransfer) (interface{}, error) {
firstErr := firstErr{}
ctx, cancel := context.WithCancel(ctx) // New context so that any failure cancels everything
defer cancel()
wg := sync.WaitGroup{} // Used to know when all outgoing messages have finished processing
@ -463,9 +501,12 @@ func uploadStream(ctx context.Context, reader io.Reader, o UploadStreamOptions,
err := t.chunk(ctx, outgoingMsg.chunkNum, outgoingMsg.buffer)
wg.Done() // Indicate this buffer was sent
if nil != err {
// NOTE: finalErr could be assigned to multiple times here which is OK,
// some error will be returned.
firstErr.set(err)
cancel()
}
incoming <- outgoingMsg.buffer // The goroutine reading from the stream can use reuse this buffer now
incoming <- outgoingMsg.buffer // The goroutine reading from the stream can reuse this buffer now
}
}()
}
@ -490,7 +531,7 @@ func uploadStream(ctx context.Context, reader io.Reader, o UploadStreamOptions,
buffer = <-incoming
}
n, err := io.ReadFull(reader, buffer)
if err != nil {
if err != nil { // Less than len(buffer) bytes were read
buffer = buffer[:n] // Make slice match the # of read bytes
}
if len(buffer) > 0 {
@ -499,12 +540,21 @@ func uploadStream(ctx context.Context, reader io.Reader, o UploadStreamOptions,
outgoing <- OutgoingMsg{chunkNum: c, buffer: buffer}
}
if err != nil { // The reader is done, no more outgoing buffers
if err == io.EOF || err == io.ErrUnexpectedEOF {
err = nil // This function does NOT return an error if io.ReadFull returns io.EOF or io.ErrUnexpectedEOF
} else {
firstErr.set(err)
}
break
}
}
// NOTE: Don't close the incoming channel because the outgoing goroutines post buffers into it when they are done
close(outgoing) // Make all the outgoing goroutines terminate when this channel is empty
wg.Wait() // Wait for all pending outgoing messages to complete
// After all blocks uploaded, commit them to the blob & return the result
return t.end(ctx)
err := firstErr.get()
if err == nil {
// If no error, after all blocks uploaded, commit them to the blob & return the result
return t.end(ctx)
}
return nil, err
}