package azblob import ( "context" "encoding/base64" "io" "net/http" "bytes" "os" "sync" "time" "errors" "github.com/Azure/azure-pipeline-go/pipeline" ) // CommonResponse returns the headers common to all blob REST API responses. type CommonResponse interface { // ETag returns the value for header ETag. ETag() ETag // LastModified returns the value for header Last-Modified. LastModified() time.Time // RequestID returns the value for header x-ms-request-id. RequestID() string // Date returns the value for header Date. Date() time.Time // Version returns the value for header x-ms-version. Version() string // Response returns the raw HTTP response object. Response() *http.Response } // UploadToBlockBlobOptions identifies options used by the UploadBufferToBlockBlob and UploadFileToBlockBlob functions. type UploadToBlockBlobOptions struct { // BlockSize specifies the block size to use; the default (and maximum size) is BlockBlobMaxStageBlockBytes. BlockSize int64 // Progress is a function that is invoked periodically as bytes are sent to the BlockBlobURL. // Note that the progress reporting is not always increasing; it can go down when retrying a request. Progress pipeline.ProgressReceiver // BlobHTTPHeaders indicates the HTTP headers to be associated with the blob. BlobHTTPHeaders BlobHTTPHeaders // Metadata indicates the metadata to be associated with the blob when PutBlockList is called. Metadata Metadata // AccessConditions indicates the access conditions for the block blob. AccessConditions BlobAccessConditions // Parallelism indicates the maximum number of blocks to upload in parallel (0=default) Parallelism uint16 } // UploadBufferToBlockBlob uploads a buffer in blocks to a block blob. func UploadBufferToBlockBlob(ctx context.Context, b []byte, blockBlobURL BlockBlobURL, o UploadToBlockBlobOptions) (CommonResponse, error) { bufferSize := int64(len(b)) if o.BlockSize == 0 { // If bufferSize > (BlockBlobMaxStageBlockBytes * BlockBlobMaxBlocks), then error if bufferSize > BlockBlobMaxStageBlockBytes*BlockBlobMaxBlocks { return nil, errors.New("Buffer is too large to upload to a block blob") } // If bufferSize <= BlockBlobMaxUploadBlobBytes, then Upload should be used with just 1 I/O request if bufferSize <= BlockBlobMaxUploadBlobBytes { o.BlockSize = BlockBlobMaxUploadBlobBytes // Default if unspecified } else { o.BlockSize = bufferSize / BlockBlobMaxBlocks // buffer / max blocks = block size to use all 50,000 blocks if o.BlockSize < BlobDefaultDownloadBlockSize { // If the block size is smaller than 4MB, round up to 4MB o.BlockSize = BlobDefaultDownloadBlockSize } // StageBlock will be called with blockSize blocks and a parallelism of (BufferSize / BlockSize). } } if bufferSize <= BlockBlobMaxUploadBlobBytes { // If the size can fit in 1 Upload call, do it this way var body io.ReadSeeker = bytes.NewReader(b) if o.Progress != nil { body = pipeline.NewRequestBodyProgress(body, o.Progress) } return blockBlobURL.Upload(ctx, body, o.BlobHTTPHeaders, o.Metadata, o.AccessConditions) } var numBlocks = uint16(((bufferSize - 1) / o.BlockSize) + 1) blockIDList := make([]string, numBlocks) // Base-64 encoded block IDs progress := int64(0) progressLock := &sync.Mutex{} err := doBatchTransfer(ctx, batchTransferOptions{ operationName: "UploadBufferToBlockBlob", transferSize: bufferSize, chunkSize: o.BlockSize, parallelism: o.Parallelism, operation: func(offset int64, count int64) error { // This function is called once per block. // It is passed this block's offset within the buffer and its count of bytes // Prepare to read the proper block/section of the buffer var body io.ReadSeeker = bytes.NewReader(b[offset : offset+count]) blockNum := offset / o.BlockSize if o.Progress != nil { blockProgress := int64(0) body = pipeline.NewRequestBodyProgress(body, func(bytesTransferred int64) { diff := bytesTransferred - blockProgress blockProgress = bytesTransferred progressLock.Lock() // 1 goroutine at a time gets a progress report progress += diff o.Progress(progress) progressLock.Unlock() }) } // Block IDs are unique values to avoid issue if 2+ clients are uploading blocks // at the same time causing PutBlockList to get a mix of blocks from all the clients. blockIDList[blockNum] = base64.StdEncoding.EncodeToString(newUUID().bytes()) _, err := blockBlobURL.StageBlock(ctx, blockIDList[blockNum], body, o.AccessConditions.LeaseAccessConditions, nil) return err }, }) if err != nil { return nil, err } // All put blocks were successful, call Put Block List to finalize the blob return blockBlobURL.CommitBlockList(ctx, blockIDList, o.BlobHTTPHeaders, o.Metadata, o.AccessConditions) } // UploadFileToBlockBlob uploads a file in blocks to a block blob. func UploadFileToBlockBlob(ctx context.Context, file *os.File, blockBlobURL BlockBlobURL, o UploadToBlockBlobOptions) (CommonResponse, error) { stat, err := file.Stat() if err != nil { return nil, err } m := mmf{} // Default to an empty slice; used for 0-size file if stat.Size() != 0 { m, err = newMMF(file, false, 0, int(stat.Size())) if err != nil { return nil, err } defer m.unmap() } return UploadBufferToBlockBlob(ctx, m, blockBlobURL, o) } /////////////////////////////////////////////////////////////////////////////// const BlobDefaultDownloadBlockSize = int64(4 * 1024 * 1024) // 4MB // DownloadFromBlobOptions identifies options used by the DownloadBlobToBuffer and DownloadBlobToFile functions. type DownloadFromBlobOptions struct { // BlockSize specifies the block size to use for each parallel download; the default size is BlobDefaultDownloadBlockSize. BlockSize int64 // Progress is a function that is invoked periodically as bytes are received. Progress pipeline.ProgressReceiver // AccessConditions indicates the access conditions used when making HTTP GET requests against the blob. AccessConditions BlobAccessConditions // Parallelism indicates the maximum number of blocks to download in parallel (0=default) Parallelism uint16 // RetryReaderOptionsPerBlock is used when downloading each block. RetryReaderOptionsPerBlock RetryReaderOptions } // downloadBlobToBuffer downloads an Azure blob to a buffer with parallel. func downloadBlobToBuffer(ctx context.Context, blobURL BlobURL, offset int64, count int64, b []byte, o DownloadFromBlobOptions, initialDownloadResponse *DownloadResponse) error { if o.BlockSize == 0 { o.BlockSize = BlobDefaultDownloadBlockSize } if count == CountToEnd { // If size not specified, calculate it if initialDownloadResponse != nil { count = initialDownloadResponse.ContentLength() - offset // if we have the length, use it } else { // If we don't have the length at all, get it dr, err := blobURL.Download(ctx, 0, CountToEnd, o.AccessConditions, false) if err != nil { return err } count = dr.ContentLength() - offset } } // Prepare and do parallel download. progress := int64(0) progressLock := &sync.Mutex{} err := doBatchTransfer(ctx, batchTransferOptions{ operationName: "downloadBlobToBuffer", transferSize: count, chunkSize: o.BlockSize, parallelism: o.Parallelism, operation: func(chunkStart int64, count int64) error { dr, err := blobURL.Download(ctx, chunkStart+offset, count, o.AccessConditions, false) body := dr.Body(o.RetryReaderOptionsPerBlock) if o.Progress != nil { rangeProgress := int64(0) body = pipeline.NewResponseBodyProgress( body, func(bytesTransferred int64) { diff := bytesTransferred - rangeProgress rangeProgress = bytesTransferred progressLock.Lock() progress += diff o.Progress(progress) progressLock.Unlock() }) } _, err = io.ReadFull(body, b[chunkStart:chunkStart+count]) body.Close() return err }, }) if err != nil { return err } return nil } // DownloadBlobToBuffer downloads an Azure blob to a buffer with parallel. // Offset and count are optional, pass 0 for both to download the entire blob. func DownloadBlobToBuffer(ctx context.Context, blobURL BlobURL, offset int64, count int64, b []byte, o DownloadFromBlobOptions) error { return downloadBlobToBuffer(ctx, blobURL, offset, count, b, o, nil) } // DownloadBlobToFile downloads an Azure blob to a local file. // The file would be truncated if the size doesn't match. // Offset and count are optional, pass 0 for both to download the entire blob. func DownloadBlobToFile(ctx context.Context, blobURL BlobURL, offset int64, count int64, file *os.File, o DownloadFromBlobOptions) error { // 1. Calculate the size of the destination file var size int64 if count == CountToEnd { // Try to get Azure blob's size props, err := blobURL.GetProperties(ctx, o.AccessConditions) if err != nil { return err } size = props.ContentLength() - offset } else { size = count } // 2. Compare and try to resize local file's size if it doesn't match Azure blob's size. stat, err := file.Stat() if err != nil { return err } if stat.Size() != size { if err = file.Truncate(size); err != nil { return err } } if size > 0 { // 3. Set mmap and call downloadBlobToBuffer. m, err := newMMF(file, true, 0, int(size)) if err != nil { return err } defer m.unmap() return downloadBlobToBuffer(ctx, blobURL, offset, size, m, o, nil) } else { // if the blob's size is 0, there is no need in downloading it return nil } } /////////////////////////////////////////////////////////////////////////////// // BatchTransferOptions identifies options used by doBatchTransfer. type batchTransferOptions struct { transferSize int64 chunkSize int64 parallelism uint16 operation func(offset int64, chunkSize int64) error operationName string } // doBatchTransfer helps to execute operations in a batch manner. func doBatchTransfer(ctx context.Context, o batchTransferOptions) error { // Prepare and do parallel operations. numChunks := uint16(((o.transferSize - 1) / o.chunkSize) + 1) operationChannel := make(chan func() error, o.parallelism) // Create the channel that release 'parallelism' goroutines concurrently operationResponseChannel := make(chan error, numChunks) // Holds each response ctx, cancel := context.WithCancel(ctx) defer cancel() // Create the goroutines that process each operation (in parallel). if o.parallelism == 0 { o.parallelism = 5 // default parallelism } for g := uint16(0); g < o.parallelism; g++ { //grIndex := g go func() { for f := range operationChannel { //fmt.Printf("[%s] gr-%d start action\n", o.operationName, grIndex) err := f() operationResponseChannel <- err //fmt.Printf("[%s] gr-%d end action\n", o.operationName, grIndex) } }() } // Add each chunk's operation to the channel. for chunkNum := uint16(0); chunkNum < numChunks; chunkNum++ { curChunkSize := o.chunkSize if chunkNum == numChunks-1 { // Last chunk curChunkSize = o.transferSize - (int64(chunkNum) * o.chunkSize) // Remove size of all transferred chunks from total } offset := int64(chunkNum) * o.chunkSize operationChannel <- func() error { return o.operation(offset, curChunkSize) } } close(operationChannel) // Wait for the operations to complete. for chunkNum := uint16(0); chunkNum < numChunks; chunkNum++ { responseError := <-operationResponseChannel if responseError != nil { cancel() // As soon as any operation fails, cancel all remaining operation calls return responseError // No need to process anymore responses } } return nil } //////////////////////////////////////////////////////////////////////////////////////////////// type UploadStreamToBlockBlobOptions struct { BufferSize int MaxBuffers int BlobHTTPHeaders BlobHTTPHeaders Metadata Metadata AccessConditions BlobAccessConditions } func UploadStreamToBlockBlob(ctx context.Context, reader io.Reader, blockBlobURL BlockBlobURL, o UploadStreamToBlockBlobOptions) (CommonResponse, error) { result, err := uploadStream(ctx, reader, UploadStreamOptions{BufferSize: o.BufferSize, MaxBuffers: o.MaxBuffers}, &uploadStreamToBlockBlobOptions{b: blockBlobURL, o: o, blockIDPrefix: newUUID()}) if err != nil { return nil, err } return result.(CommonResponse), nil } type uploadStreamToBlockBlobOptions struct { b BlockBlobURL o UploadStreamToBlockBlobOptions blockIDPrefix uuid // UUID used with all blockIDs maxBlockNum uint32 // defaults to 0 firstBlock []byte // Used only if maxBlockNum is 0 } func (t *uploadStreamToBlockBlobOptions) start(ctx context.Context) (interface{}, error) { return nil, nil } func (t *uploadStreamToBlockBlobOptions) chunk(ctx context.Context, num uint32, buffer []byte) error { if num == 0 { t.firstBlock = buffer // If whole payload fits in 1 block, don't stage it; End will upload it with 1 I/O operation // If the payload is exactly the same size as the buffer, there may be more content coming in. if len(buffer) < t.o.BufferSize { return nil } } // Else, upload a staged block... atomicMorphUint32(&t.maxBlockNum, func(startVal uint32) (val uint32, morphResult interface{}) { // Atomically remember (in t.numBlocks) the maximum block num we've ever seen if startVal < num { return num, nil } return startVal, nil }) blockID := newUuidBlockID(t.blockIDPrefix).WithBlockNumber(num).ToBase64() _, err := t.b.StageBlock(ctx, blockID, bytes.NewReader(buffer), LeaseAccessConditions{}, nil) return err } func (t *uploadStreamToBlockBlobOptions) end(ctx context.Context) (interface{}, error) { // If the first block had the exact same size as the buffer // we would have staged it as a block thinking that there might be more data coming if t.maxBlockNum == 0 && len(t.firstBlock) != t.o.BufferSize { // If whole payload fits in 1 block (block #0), upload it with 1 I/O operation return t.b.Upload(ctx, bytes.NewReader(t.firstBlock), t.o.BlobHTTPHeaders, t.o.Metadata, t.o.AccessConditions) } // Multiple blocks staged, commit them all now blockID := newUuidBlockID(t.blockIDPrefix) blockIDs := make([]string, t.maxBlockNum+1) for bn := uint32(0); bn <= t.maxBlockNum; bn++ { blockIDs[bn] = blockID.WithBlockNumber(bn).ToBase64() } return t.b.CommitBlockList(ctx, blockIDs, t.o.BlobHTTPHeaders, t.o.Metadata, t.o.AccessConditions) } //////////////////////////////////////////////////////////////////////////////////////////////////// type iTransfer interface { start(ctx context.Context) (interface{}, error) chunk(ctx context.Context, num uint32, buffer []byte) error end(ctx context.Context) (interface{}, error) } type UploadStreamOptions struct { MaxBuffers int BufferSize int } type firstErr struct { lock sync.Mutex finalError error } func (fe *firstErr) set(err error) { fe.lock.Lock() if fe.finalError == nil { fe.finalError = err } fe.lock.Unlock() } func (fe *firstErr) get() (err error) { fe.lock.Lock() err = fe.finalError fe.lock.Unlock() return } func uploadStream(ctx context.Context, reader io.Reader, o UploadStreamOptions, t iTransfer) (interface{}, error) { firstErr := firstErr{} ctx, cancel := context.WithCancel(ctx) // New context so that any failure cancels everything defer cancel() wg := sync.WaitGroup{} // Used to know when all outgoing messages have finished processing type OutgoingMsg struct { chunkNum uint32 buffer []byte } // Create a channel to hold the buffers usable for incoming datsa incoming := make(chan []byte, o.MaxBuffers) outgoing := make(chan OutgoingMsg, o.MaxBuffers) // Channel holding outgoing buffers if result, err := t.start(ctx); err != nil { return result, err } numBuffers := 0 // The number of buffers & out going goroutines created so far injectBuffer := func() { // For each Buffer, create it and a goroutine to upload it incoming <- make([]byte, o.BufferSize) // Add the new buffer to the incoming channel so this goroutine can from the reader into it numBuffers++ go func() { for outgoingMsg := range outgoing { // Upload the outgoing buffer err := t.chunk(ctx, outgoingMsg.chunkNum, outgoingMsg.buffer) wg.Done() // Indicate this buffer was sent if nil != err { // NOTE: finalErr could be assigned to multiple times here which is OK, // some error will be returned. firstErr.set(err) cancel() } incoming <- outgoingMsg.buffer // The goroutine reading from the stream can reuse this buffer now } }() } injectBuffer() // Create our 1st buffer & outgoing goroutine // This goroutine grabs a buffer, reads from the stream into the buffer, // and inserts the buffer into the outgoing channel to be uploaded for c := uint32(0); true; c++ { // Iterate once per chunk var buffer []byte if numBuffers < o.MaxBuffers { select { // We're not at max buffers, see if a previously-created buffer is available case buffer = <-incoming: break default: // No buffer available; inject a new buffer & go routine to process it injectBuffer() buffer = <-incoming // Grab the just-injected buffer } } else { // We are at max buffers, block until we get to reuse one buffer = <-incoming } n, err := io.ReadFull(reader, buffer) if err != nil { // Less than len(buffer) bytes were read buffer = buffer[:n] // Make slice match the # of read bytes } if len(buffer) > 0 { // Buffer not empty, upload it wg.Add(1) // We're posting a buffer to be sent outgoing <- OutgoingMsg{chunkNum: c, buffer: buffer} } if err != nil { // The reader is done, no more outgoing buffers if err == io.EOF || err == io.ErrUnexpectedEOF { err = nil // This function does NOT return an error if io.ReadFull returns io.EOF or io.ErrUnexpectedEOF } else { firstErr.set(err) } break } } // NOTE: Don't close the incoming channel because the outgoing goroutines post buffers into it when they are done close(outgoing) // Make all the outgoing goroutines terminate when this channel is empty wg.Wait() // Wait for all pending outgoing messages to complete err := firstErr.get() if err == nil { // If no error, after all blocks uploaded, commit them to the blob & return the result return t.end(ctx) } return nil, err }