From e7485c831feb33d289697c43fe3f771d343c7f74 Mon Sep 17 00:00:00 2001 From: Ahmet Alp Balkan Date: Tue, 3 Feb 2015 15:29:00 -0800 Subject: [PATCH] Support writes to random offsets in Azure storage driver Signed-off-by: Ahmet Alp Balkan --- storagedriver/azure/azure.go | 106 +------ storagedriver/azure/blockblob.go | 24 ++ storagedriver/azure/blockblob_test.go | 155 ++++++++++ storagedriver/azure/blockid.go | 60 ++++ storagedriver/azure/blockid_test.go | 74 +++++ storagedriver/azure/randomwriter.go | 208 +++++++++++++ storagedriver/azure/randomwriter_test.go | 339 +++++++++++++++++++++ storagedriver/azure/zerofillwriter.go | 49 +++ storagedriver/azure/zerofillwriter_test.go | 126 ++++++++ 9 files changed, 1045 insertions(+), 96 deletions(-) create mode 100644 storagedriver/azure/blockblob.go create mode 100644 storagedriver/azure/blockblob_test.go create mode 100644 storagedriver/azure/blockid.go create mode 100644 storagedriver/azure/blockid_test.go create mode 100644 storagedriver/azure/randomwriter.go create mode 100644 storagedriver/azure/randomwriter_test.go create mode 100644 storagedriver/azure/zerofillwriter.go create mode 100644 storagedriver/azure/zerofillwriter_test.go diff --git a/storagedriver/azure/azure.go b/storagedriver/azure/azure.go index 995f0fcad..9a7f6e1f2 100644 --- a/storagedriver/azure/azure.go +++ b/storagedriver/azure/azure.go @@ -4,12 +4,10 @@ package azure import ( "bytes" - "encoding/base64" "fmt" "io" "io/ioutil" "net/http" - "strconv" "strings" "time" @@ -30,7 +28,7 @@ const ( // Driver is a storagedriver.StorageDriver implementation backed by // Microsoft Azure Blob Storage Service. type Driver struct { - client *azure.BlobStorageClient + client azure.BlobStorageClient container string } @@ -79,7 +77,7 @@ func New(accountName, accountKey, container string) (*Driver, error) { } return &Driver{ - client: blobClient, + client: *blobClient, container: container}, nil } @@ -140,92 +138,22 @@ func (d *driver) ReadStream(path string, offset int64) (io.ReadCloser, error) { // WriteStream stores the contents of the provided io.ReadCloser at a location // designated by the given path. func (d *Driver) WriteStream(path string, offset int64, reader io.Reader) (int64, error) { - if !storagedriver.PathRegexp.MatchString(path) { - return 0, storagedriver.InvalidPathError{Path: path} - } - - var ( - lastBlockNum int - resumableOffset int64 - blocks []azure.Block - ) - if blobExists, err := d.client.BlobExists(d.container, path); err != nil { return 0, err - } else if !blobExists { // new blob - lastBlockNum = 0 - resumableOffset = 0 - } else { // append - if parts, err := d.client.GetBlockList(d.container, path, azure.BlockListTypeCommitted); err != nil { + } else if !blobExists { + err := d.client.CreateBlockBlob(d.container, path) + if err != nil { return 0, err - } else if len(parts.CommittedBlocks) == 0 { - lastBlockNum = 0 - resumableOffset = 0 - } else { - lastBlock := parts.CommittedBlocks[len(parts.CommittedBlocks)-1] - if lastBlockNum, err = fromBlockID(lastBlock.Name); err != nil { - return 0, fmt.Errorf("Cannot parse block name as number '%s': %s", lastBlock.Name, err.Error()) - } - - var totalSize int64 - for _, v := range parts.CommittedBlocks { - blocks = append(blocks, azure.Block{ - Id: v.Name, - Status: azure.BlockStatusCommitted}) - totalSize += int64(v.Size) - } - - // NOTE: Azure driver currently supports only append mode (resumable - // index is exactly where the committed blocks of the blob end). - // In order to support writing to offsets other than last index, - // adjacent blocks overlapping with the [offset:offset+size] area - // must be fetched, splitted and should be overwritten accordingly. - // As the current use of this method is append only, that implementation - // is omitted. - resumableOffset = totalSize } } - - if offset < resumableOffset { - // only writing at the end or after the end of the file is supported + if offset < 0 { return 0, storagedriver.InvalidOffsetError{Path: path, Offset: offset} - } else if offset > resumableOffset { - // zero-fill in between, construct a multi-reader - zeroReader := bytes.NewReader(make([]byte, offset-resumableOffset)) - reader = io.MultiReader(zeroReader, reader) } - // Put content - var nn int64 - buf := make([]byte, azure.MaxBlobBlockSize) - for { - // Read chunks of exactly size N except the last chunk to - // maximize block size and minimize block count. - n, err := io.ReadFull(reader, buf) - if err == io.EOF { - break - } - nn += int64(n) - - data := buf[:n] - lastBlockNum++ - blockID := toBlockID(lastBlockNum) - if err = d.client.PutBlock(d.container, path, blockID, data); err != nil { - return 0, err - } - - blocks = append(blocks, azure.Block{ - Id: blockID, - Status: azure.BlockStatusLatest}) - } - - // If there was a zero-fill, adjust nn to exclude zeros - if offset > resumableOffset { - nn -= offset - resumableOffset - } - - // Commit block list - return nn, d.client.PutBlockList(d.container, path, blocks) + bs := newAzureBlockStorage(d.client) + bw := newRandomBlobWriter(&bs, azure.MaxBlobBlockSize) + zw := newZeroFillWriter(&bw) + return zw.Write(d.container, path, offset, reader) } // Stat retrieves the FileInfo for the given path, including the current size @@ -436,17 +364,3 @@ func is404(err error) bool { e, ok := err.(azure.StorageServiceError) return ok && e.StatusCode == 404 } - -func fromBlockID(b64Name string) (int, error) { - s, err := base64.StdEncoding.DecodeString(b64Name) - if err != nil { - return 0, err - } - - return strconv.Atoi(string(s)) -} - -func toBlockID(i int) string { - s := fmt.Sprintf("%010d", i) // add zero padding - return base64.StdEncoding.EncodeToString([]byte(s)) -} diff --git a/storagedriver/azure/blockblob.go b/storagedriver/azure/blockblob.go new file mode 100644 index 000000000..d868453f1 --- /dev/null +++ b/storagedriver/azure/blockblob.go @@ -0,0 +1,24 @@ +package azure + +import ( + "fmt" + "io" + + azure "github.com/MSOpenTech/azure-sdk-for-go/clients/storage" +) + +// azureBlockStorage is adaptor between azure.BlobStorageClient and +// blockStorage interface. +type azureBlockStorage struct { + azure.BlobStorageClient +} + +func (b *azureBlockStorage) GetSectionReader(container, blob string, start, length int64) (io.ReadCloser, error) { + return b.BlobStorageClient.GetBlobRange(container, blob, fmt.Sprintf("%v-%v", start, start+length-1)) +} + +func newAzureBlockStorage(b azure.BlobStorageClient) azureBlockStorage { + a := azureBlockStorage{} + a.BlobStorageClient = b + return a +} diff --git a/storagedriver/azure/blockblob_test.go b/storagedriver/azure/blockblob_test.go new file mode 100644 index 000000000..f1e390277 --- /dev/null +++ b/storagedriver/azure/blockblob_test.go @@ -0,0 +1,155 @@ +package azure + +import ( + "bytes" + "fmt" + "io" + "io/ioutil" + + azure "github.com/MSOpenTech/azure-sdk-for-go/clients/storage" +) + +type StorageSimulator struct { + blobs map[string]*BlockBlob +} + +type BlockBlob struct { + blocks map[string]*DataBlock + blockList []string +} + +type DataBlock struct { + data []byte + committed bool +} + +func (s *StorageSimulator) path(container, blob string) string { + return fmt.Sprintf("%s/%s", container, blob) +} + +func (s *StorageSimulator) BlobExists(container, blob string) (bool, error) { + _, ok := s.blobs[s.path(container, blob)] + return ok, nil +} + +func (s *StorageSimulator) GetBlob(container, blob string) (io.ReadCloser, error) { + bb, ok := s.blobs[s.path(container, blob)] + if !ok { + return nil, fmt.Errorf("blob not found") + } + + var readers []io.Reader + for _, bID := range bb.blockList { + readers = append(readers, bytes.NewReader(bb.blocks[bID].data)) + } + return ioutil.NopCloser(io.MultiReader(readers...)), nil +} + +func (s *StorageSimulator) GetSectionReader(container, blob string, start, length int64) (io.ReadCloser, error) { + r, err := s.GetBlob(container, blob) + if err != nil { + return nil, err + } + b, err := ioutil.ReadAll(r) + if err != nil { + return nil, err + } + return ioutil.NopCloser(bytes.NewReader(b[start : start+length])), nil +} + +func (s *StorageSimulator) CreateBlockBlob(container, blob string) error { + path := s.path(container, blob) + bb := &BlockBlob{ + blocks: make(map[string]*DataBlock), + blockList: []string{}, + } + s.blobs[path] = bb + return nil +} + +func (s *StorageSimulator) PutBlock(container, blob, blockID string, chunk []byte) error { + path := s.path(container, blob) + bb, ok := s.blobs[path] + if !ok { + return fmt.Errorf("blob not found") + } + data := make([]byte, len(chunk)) + copy(data, chunk) + bb.blocks[blockID] = &DataBlock{data: data, committed: false} // add block to blob + return nil +} + +func (s *StorageSimulator) GetBlockList(container, blob string, blockType azure.BlockListType) (azure.BlockListResponse, error) { + resp := azure.BlockListResponse{} + bb, ok := s.blobs[s.path(container, blob)] + if !ok { + return resp, fmt.Errorf("blob not found") + } + + // Iterate committed blocks (in order) + if blockType == azure.BlockListTypeAll || blockType == azure.BlockListTypeCommitted { + for _, blockID := range bb.blockList { + b := bb.blocks[blockID] + block := azure.BlockResponse{ + Name: blockID, + Size: int64(len(b.data)), + } + resp.CommittedBlocks = append(resp.CommittedBlocks, block) + } + + } + + // Iterate uncommitted blocks (in no order) + if blockType == azure.BlockListTypeAll || blockType == azure.BlockListTypeCommitted { + for blockID, b := range bb.blocks { + block := azure.BlockResponse{ + Name: blockID, + Size: int64(len(b.data)), + } + if !b.committed { + resp.UncommittedBlocks = append(resp.UncommittedBlocks, block) + } + } + } + return resp, nil +} + +func (s *StorageSimulator) PutBlockList(container, blob string, blocks []azure.Block) error { + bb, ok := s.blobs[s.path(container, blob)] + if !ok { + return fmt.Errorf("blob not found") + } + + var blockIDs []string + for _, v := range blocks { + bl, ok := bb.blocks[v.Id] + if !ok { // check if block ID exists + return fmt.Errorf("Block id '%s' not found", v.Id) + } + bl.committed = true + blockIDs = append(blockIDs, v.Id) + } + + // Mark all other blocks uncommitted + for k, b := range bb.blocks { + inList := false + for _, v := range blockIDs { + if k == v { + inList = true + break + } + } + if !inList { + b.committed = false + } + } + + bb.blockList = blockIDs + return nil +} + +func NewStorageSimulator() StorageSimulator { + return StorageSimulator{ + blobs: make(map[string]*BlockBlob), + } +} diff --git a/storagedriver/azure/blockid.go b/storagedriver/azure/blockid.go new file mode 100644 index 000000000..61f41ebcf --- /dev/null +++ b/storagedriver/azure/blockid.go @@ -0,0 +1,60 @@ +package azure + +import ( + "encoding/base64" + "fmt" + "math/rand" + "sync" + "time" + + azure "github.com/MSOpenTech/azure-sdk-for-go/clients/storage" +) + +type blockIDGenerator struct { + pool map[string]bool + r *rand.Rand + m sync.Mutex +} + +// Generate returns an unused random block id and adds the generated ID +// to list of used IDs so that the same block name is not used again. +func (b *blockIDGenerator) Generate() string { + b.m.Lock() + defer b.m.Unlock() + + var id string + for { + id = toBlockID(int(b.r.Int())) + if !b.exists(id) { + break + } + } + b.pool[id] = true + return id +} + +func (b *blockIDGenerator) exists(id string) bool { + _, used := b.pool[id] + return used +} + +func (b *blockIDGenerator) Feed(blocks azure.BlockListResponse) { + b.m.Lock() + defer b.m.Unlock() + + for _, bl := range append(blocks.CommittedBlocks, blocks.UncommittedBlocks...) { + b.pool[bl.Name] = true + } +} + +func newBlockIDGenerator() *blockIDGenerator { + return &blockIDGenerator{ + pool: make(map[string]bool), + r: rand.New(rand.NewSource(time.Now().UnixNano()))} +} + +// toBlockId converts given integer to base64-encoded block ID of a fixed length. +func toBlockID(i int) string { + s := fmt.Sprintf("%029d", i) // add zero padding for same length-blobs + return base64.StdEncoding.EncodeToString([]byte(s)) +} diff --git a/storagedriver/azure/blockid_test.go b/storagedriver/azure/blockid_test.go new file mode 100644 index 000000000..46d52a342 --- /dev/null +++ b/storagedriver/azure/blockid_test.go @@ -0,0 +1,74 @@ +package azure + +import ( + "math" + "testing" + + azure "github.com/MSOpenTech/azure-sdk-for-go/clients/storage" +) + +func Test_blockIdGenerator(t *testing.T) { + r := newBlockIDGenerator() + + for i := 1; i <= 10; i++ { + if expected := i - 1; len(r.pool) != expected { + t.Fatalf("rand pool had wrong number of items: %d, expected:%d", len(r.pool), expected) + } + if id := r.Generate(); id == "" { + t.Fatal("returned empty id") + } + if expected := i; len(r.pool) != expected { + t.Fatalf("rand pool has wrong number of items: %d, expected:%d", len(r.pool), expected) + } + } +} + +func Test_blockIdGenerator_Feed(t *testing.T) { + r := newBlockIDGenerator() + if expected := 0; len(r.pool) != expected { + t.Fatalf("rand pool had wrong number of items: %d, expected:%d", len(r.pool), expected) + } + + // feed empty list + blocks := azure.BlockListResponse{} + r.Feed(blocks) + if expected := 0; len(r.pool) != expected { + t.Fatalf("rand pool had wrong number of items: %d, expected:%d", len(r.pool), expected) + } + + // feed blocks + blocks = azure.BlockListResponse{ + CommittedBlocks: []azure.BlockResponse{ + {"1", 1}, + {"2", 2}, + }, + UncommittedBlocks: []azure.BlockResponse{ + {"3", 3}, + }} + r.Feed(blocks) + if expected := 3; len(r.pool) != expected { + t.Fatalf("rand pool had wrong number of items: %d, expected:%d", len(r.pool), expected) + } + + // feed same block IDs with committed/uncommitted place changed + blocks = azure.BlockListResponse{ + CommittedBlocks: []azure.BlockResponse{ + {"3", 3}, + }, + UncommittedBlocks: []azure.BlockResponse{ + {"1", 1}, + }} + r.Feed(blocks) + if expected := 3; len(r.pool) != expected { + t.Fatalf("rand pool had wrong number of items: %d, expected:%d", len(r.pool), expected) + } +} + +func Test_toBlockId(t *testing.T) { + min := 0 + max := math.MaxInt64 + + if len(toBlockID(min)) != len(toBlockID(max)) { + t.Fatalf("different-sized blockIDs are returned") + } +} diff --git a/storagedriver/azure/randomwriter.go b/storagedriver/azure/randomwriter.go new file mode 100644 index 000000000..c89dd0a34 --- /dev/null +++ b/storagedriver/azure/randomwriter.go @@ -0,0 +1,208 @@ +package azure + +import ( + "fmt" + "io" + "io/ioutil" + + azure "github.com/MSOpenTech/azure-sdk-for-go/clients/storage" +) + +// blockStorage is the interface required from a block storage service +// client implementation +type blockStorage interface { + CreateBlockBlob(container, blob string) error + GetBlob(container, blob string) (io.ReadCloser, error) + GetSectionReader(container, blob string, start, length int64) (io.ReadCloser, error) + PutBlock(container, blob, blockID string, chunk []byte) error + GetBlockList(container, blob string, blockType azure.BlockListType) (azure.BlockListResponse, error) + PutBlockList(container, blob string, blocks []azure.Block) error +} + +// randomBlobWriter enables random access semantics on Azure block blobs +// by enabling writing arbitrary length of chunks to arbitrary write offsets +// within the blob. Normally, Azure Blob Storage does not support random +// access semantics on block blobs; however, this writer can download, split and +// reupload the overlapping blocks and discards those being overwritten entirely. +type randomBlobWriter struct { + bs blockStorage + blockSize int +} + +func newRandomBlobWriter(bs blockStorage, blockSize int) randomBlobWriter { + return randomBlobWriter{bs: bs, blockSize: blockSize} +} + +// WriteBlobAt writes the given chunk to the specified position of an existing blob. +// The offset must be equals to size of the blob or smaller than it. +func (r *randomBlobWriter) WriteBlobAt(container, blob string, offset int64, chunk io.Reader) (int64, error) { + rand := newBlockIDGenerator() + + blocks, err := r.bs.GetBlockList(container, blob, azure.BlockListTypeCommitted) + if err != nil { + return 0, err + } + rand.Feed(blocks) // load existing block IDs + + // Check for write offset for existing blob + size := getBlobSize(blocks) + if offset < 0 || offset > size { + return 0, fmt.Errorf("wrong offset for Write: %v", offset) + } + + // Upload the new chunk as blocks + blockList, nn, err := r.writeChunkToBlocks(container, blob, chunk, rand) + if err != nil { + return 0, err + } + + // For non-append operations, existing blocks may need to be splitted + if offset != size { + // Split the block on the left end (if any) + leftBlocks, err := r.blocksLeftSide(container, blob, offset, rand) + if err != nil { + return 0, err + } + blockList = append(leftBlocks, blockList...) + + // Split the block on the right end (if any) + rightBlocks, err := r.blocksRightSide(container, blob, offset, nn, rand) + if err != nil { + return 0, err + } + blockList = append(blockList, rightBlocks...) + } else { + // Use existing block list + var existingBlocks []azure.Block + for _, v := range blocks.CommittedBlocks { + existingBlocks = append(existingBlocks, azure.Block{Id: v.Name, Status: azure.BlockStatusCommitted}) + } + blockList = append(existingBlocks, blockList...) + } + // Put block list + return nn, r.bs.PutBlockList(container, blob, blockList) +} + +func (r *randomBlobWriter) GetSize(container, blob string) (int64, error) { + blocks, err := r.bs.GetBlockList(container, blob, azure.BlockListTypeCommitted) + if err != nil { + return 0, err + } + return getBlobSize(blocks), nil +} + +// writeChunkToBlocks writes given chunk to one or multiple blocks within specified +// blob and returns their block representations. Those blocks are not committed, yet +func (r *randomBlobWriter) writeChunkToBlocks(container, blob string, chunk io.Reader, rand *blockIDGenerator) ([]azure.Block, int64, error) { + var newBlocks []azure.Block + var nn int64 + + // Read chunks of at most size N except the last chunk to + // maximize block size and minimize block count. + buf := make([]byte, r.blockSize) + for { + n, err := io.ReadFull(chunk, buf) + if err == io.EOF { + break + } + nn += int64(n) + data := buf[:n] + blockID := rand.Generate() + if err := r.bs.PutBlock(container, blob, blockID, data); err != nil { + return newBlocks, nn, err + } + newBlocks = append(newBlocks, azure.Block{Id: blockID, Status: azure.BlockStatusUncommitted}) + } + return newBlocks, nn, nil +} + +// blocksLeftSide returns the blocks that are going to be at the left side of +// the writeOffset: [0, writeOffset) by identifying blocks that will remain +// the same and splitting blocks and reuploading them as needed. +func (r *randomBlobWriter) blocksLeftSide(container, blob string, writeOffset int64, rand *blockIDGenerator) ([]azure.Block, error) { + var left []azure.Block + bx, err := r.bs.GetBlockList(container, blob, azure.BlockListTypeAll) + if err != nil { + return left, err + } + + o := writeOffset + elapsed := int64(0) + for _, v := range bx.CommittedBlocks { + blkSize := int64(v.Size) + if o >= blkSize { // use existing block + left = append(left, azure.Block{Id: v.Name, Status: azure.BlockStatusCommitted}) + o -= blkSize + elapsed += blkSize + } else if o > 0 { // current block needs to be splitted + start := elapsed + size := o + part, err := r.bs.GetSectionReader(container, blob, start, size) + if err != nil { + return left, err + } + newBlockID := rand.Generate() + + data, err := ioutil.ReadAll(part) + if err != nil { + return left, err + } + if err = r.bs.PutBlock(container, blob, newBlockID, data); err != nil { + return left, err + } + left = append(left, azure.Block{Id: newBlockID, Status: azure.BlockStatusUncommitted}) + break + } + } + return left, nil +} + +// blocksRightSide returns the blocks that are going to be at the right side of +// the written chunk: [writeOffset+size, +inf) by identifying blocks that will remain +// the same and splitting blocks and reuploading them as needed. +func (r *randomBlobWriter) blocksRightSide(container, blob string, writeOffset int64, chunkSize int64, rand *blockIDGenerator) ([]azure.Block, error) { + var right []azure.Block + + bx, err := r.bs.GetBlockList(container, blob, azure.BlockListTypeAll) + if err != nil { + return nil, err + } + + re := writeOffset + chunkSize - 1 // right end of written chunk + var elapsed int64 + for _, v := range bx.CommittedBlocks { + var ( + bs = elapsed // left end of current block + be = elapsed + int64(v.Size) - 1 // right end of current block + ) + + if bs > re { // take the block as is + right = append(right, azure.Block{Id: v.Name, Status: azure.BlockStatusCommitted}) + } else if be > re { // current block needs to be splitted + part, err := r.bs.GetSectionReader(container, blob, re+1, be-(re+1)+1) + if err != nil { + return right, err + } + newBlockID := rand.Generate() + + data, err := ioutil.ReadAll(part) + if err != nil { + return right, err + } + if err = r.bs.PutBlock(container, blob, newBlockID, data); err != nil { + return right, err + } + right = append(right, azure.Block{Id: newBlockID, Status: azure.BlockStatusUncommitted}) + } + elapsed += int64(v.Size) + } + return right, nil +} + +func getBlobSize(blocks azure.BlockListResponse) int64 { + var n int64 + for _, v := range blocks.CommittedBlocks { + n += int64(v.Size) + } + return n +} diff --git a/storagedriver/azure/randomwriter_test.go b/storagedriver/azure/randomwriter_test.go new file mode 100644 index 000000000..5201e3b49 --- /dev/null +++ b/storagedriver/azure/randomwriter_test.go @@ -0,0 +1,339 @@ +package azure + +import ( + "bytes" + "io" + "io/ioutil" + "math/rand" + "reflect" + "strings" + "testing" + + azure "github.com/MSOpenTech/azure-sdk-for-go/clients/storage" +) + +func TestRandomWriter_writeChunkToBlocks(t *testing.T) { + s := NewStorageSimulator() + rw := newRandomBlobWriter(&s, 3) + rand := newBlockIDGenerator() + c := []byte("AAABBBCCCD") + + if err := rw.bs.CreateBlockBlob("a", "b"); err != nil { + t.Fatal(err) + } + bw, nn, err := rw.writeChunkToBlocks("a", "b", bytes.NewReader(c), rand) + if err != nil { + t.Fatal(err) + } + if expected := int64(len(c)); nn != expected { + t.Fatalf("wrong nn:%v, expected:%v", nn, expected) + } + if expected := 4; len(bw) != expected { + t.Fatal("unexpected written block count") + } + + bx, err := s.GetBlockList("a", "b", azure.BlockListTypeAll) + if err != nil { + t.Fatal(err) + } + if expected := 0; len(bx.CommittedBlocks) != expected { + t.Fatal("unexpected committed block count") + } + if expected := 4; len(bx.UncommittedBlocks) != expected { + t.Fatalf("unexpected uncommitted block count: %d -- %#v", len(bx.UncommittedBlocks), bx) + } + + if err := rw.bs.PutBlockList("a", "b", bw); err != nil { + t.Fatal(err) + } + + r, err := rw.bs.GetBlob("a", "b") + if err != nil { + t.Fatal(err) + } + assertBlobContents(t, r, c) +} + +func TestRandomWriter_blocksLeftSide(t *testing.T) { + blob := "AAAAABBBBBCCC" + cases := []struct { + offset int64 + expectedBlob string + expectedPattern []azure.BlockStatus + }{ + {0, "", []azure.BlockStatus{}}, // write to beginning, discard all + {13, blob, []azure.BlockStatus{azure.BlockStatusCommitted, azure.BlockStatusCommitted, azure.BlockStatusCommitted}}, // write to end, no change + {1, "A", []azure.BlockStatus{azure.BlockStatusUncommitted}}, // write at 1 + {5, "AAAAA", []azure.BlockStatus{azure.BlockStatusCommitted}}, // write just after first block + {6, "AAAAAB", []azure.BlockStatus{azure.BlockStatusCommitted, azure.BlockStatusUncommitted}}, // split the second block + {9, "AAAAABBBB", []azure.BlockStatus{azure.BlockStatusCommitted, azure.BlockStatusUncommitted}}, // write just after first block + } + + for _, c := range cases { + s := NewStorageSimulator() + rw := newRandomBlobWriter(&s, 5) + rand := newBlockIDGenerator() + + if err := rw.bs.CreateBlockBlob("a", "b"); err != nil { + t.Fatal(err) + } + bw, _, err := rw.writeChunkToBlocks("a", "b", strings.NewReader(blob), rand) + if err != nil { + t.Fatal(err) + } + if err := rw.bs.PutBlockList("a", "b", bw); err != nil { + t.Fatal(err) + } + bx, err := rw.blocksLeftSide("a", "b", c.offset, rand) + if err != nil { + t.Fatal(err) + } + + bs := []azure.BlockStatus{} + for _, v := range bx { + bs = append(bs, v.Status) + } + + if !reflect.DeepEqual(bs, c.expectedPattern) { + t.Logf("Committed blocks %v", bw) + t.Fatalf("For offset %v: Expected pattern: %v, Got: %v\n(Returned: %v)", c.offset, c.expectedPattern, bs, bx) + } + if rw.bs.PutBlockList("a", "b", bx); err != nil { + t.Fatal(err) + } + r, err := rw.bs.GetBlob("a", "b") + if err != nil { + t.Fatal(err) + } + cout, err := ioutil.ReadAll(r) + if err != nil { + t.Fatal(err) + } + outBlob := string(cout) + if outBlob != c.expectedBlob { + t.Fatalf("wrong blob contents: %v, expected: %v", outBlob, c.expectedBlob) + } + } +} + +func TestRandomWriter_blocksRightSide(t *testing.T) { + blob := "AAAAABBBBBCCC" + cases := []struct { + offset int64 + size int64 + expectedBlob string + expectedPattern []azure.BlockStatus + }{ + {0, 100, "", []azure.BlockStatus{}}, // overwrite the entire blob + {0, 3, "AABBBBBCCC", []azure.BlockStatus{azure.BlockStatusUncommitted, azure.BlockStatusCommitted, azure.BlockStatusCommitted}}, // split first block + {4, 1, "BBBBBCCC", []azure.BlockStatus{azure.BlockStatusCommitted, azure.BlockStatusCommitted}}, // write to last char of first block + {1, 6, "BBBCCC", []azure.BlockStatus{azure.BlockStatusUncommitted, azure.BlockStatusCommitted}}, // overwrite splits first and second block, last block remains + {3, 8, "CC", []azure.BlockStatus{azure.BlockStatusUncommitted}}, // overwrite a block in middle block, split end block + {10, 1, "CC", []azure.BlockStatus{azure.BlockStatusUncommitted}}, // overwrite first byte of rightmost block + {11, 2, "", []azure.BlockStatus{}}, // overwrite the rightmost index + {13, 20, "", []azure.BlockStatus{}}, // append to the end + } + + for _, c := range cases { + s := NewStorageSimulator() + rw := newRandomBlobWriter(&s, 5) + rand := newBlockIDGenerator() + + if err := rw.bs.CreateBlockBlob("a", "b"); err != nil { + t.Fatal(err) + } + bw, _, err := rw.writeChunkToBlocks("a", "b", strings.NewReader(blob), rand) + if err != nil { + t.Fatal(err) + } + if err := rw.bs.PutBlockList("a", "b", bw); err != nil { + t.Fatal(err) + } + bx, err := rw.blocksRightSide("a", "b", c.offset, c.size, rand) + if err != nil { + t.Fatal(err) + } + + bs := []azure.BlockStatus{} + for _, v := range bx { + bs = append(bs, v.Status) + } + + if !reflect.DeepEqual(bs, c.expectedPattern) { + t.Logf("Committed blocks %v", bw) + t.Fatalf("For offset %v-size:%v: Expected pattern: %v, Got: %v\n(Returned: %v)", c.offset, c.size, c.expectedPattern, bs, bx) + } + if rw.bs.PutBlockList("a", "b", bx); err != nil { + t.Fatal(err) + } + r, err := rw.bs.GetBlob("a", "b") + if err != nil { + t.Fatal(err) + } + cout, err := ioutil.ReadAll(r) + if err != nil { + t.Fatal(err) + } + outBlob := string(cout) + if outBlob != c.expectedBlob { + t.Fatalf("For offset %v-size:%v: wrong blob contents: %v, expected: %v", c.offset, c.size, outBlob, c.expectedBlob) + } + } +} + +func TestRandomWriter_Write_NewBlob(t *testing.T) { + var ( + s = NewStorageSimulator() + rw = newRandomBlobWriter(&s, 1024*3) // 3 KB blocks + blob = randomContents(1024 * 7) // 7 KB blob + ) + if err := rw.bs.CreateBlockBlob("a", "b"); err != nil { + t.Fatal(err) + } + + if _, err := rw.WriteBlobAt("a", "b", 10, bytes.NewReader(blob)); err == nil { + t.Fatal("expected error, got nil") + } + if _, err := rw.WriteBlobAt("a", "b", 100000, bytes.NewReader(blob)); err == nil { + t.Fatal("expected error, got nil") + } + if nn, err := rw.WriteBlobAt("a", "b", 0, bytes.NewReader(blob)); err != nil { + t.Fatal(err) + } else if expected := int64(len(blob)); expected != nn { + t.Fatalf("wrong written bytes count: %v, expected: %v", nn, expected) + } + if out, err := rw.bs.GetBlob("a", "b"); err != nil { + t.Fatal(err) + } else { + assertBlobContents(t, out, blob) + } + if bx, err := rw.bs.GetBlockList("a", "b", azure.BlockListTypeCommitted); err != nil { + t.Fatal(err) + } else if len(bx.CommittedBlocks) != 3 { + t.Fatalf("got wrong number of committed blocks: %v", len(bx.CommittedBlocks)) + } + + // Replace first 512 bytes + leftChunk := randomContents(512) + blob = append(leftChunk, blob[512:]...) + if nn, err := rw.WriteBlobAt("a", "b", 0, bytes.NewReader(leftChunk)); err != nil { + t.Fatal(err) + } else if expected := int64(len(leftChunk)); expected != nn { + t.Fatalf("wrong written bytes count: %v, expected: %v", nn, expected) + } + if out, err := rw.bs.GetBlob("a", "b"); err != nil { + t.Fatal(err) + } else { + assertBlobContents(t, out, blob) + } + if bx, err := rw.bs.GetBlockList("a", "b", azure.BlockListTypeCommitted); err != nil { + t.Fatal(err) + } else if expected := 4; len(bx.CommittedBlocks) != expected { + t.Fatalf("got wrong number of committed blocks: %v, expected: %v", len(bx.CommittedBlocks), expected) + } + + // Replace last 512 bytes with 1024 bytes + rightChunk := randomContents(1024) + offset := int64(len(blob) - 512) + blob = append(blob[:offset], rightChunk...) + if nn, err := rw.WriteBlobAt("a", "b", offset, bytes.NewReader(rightChunk)); err != nil { + t.Fatal(err) + } else if expected := int64(len(rightChunk)); expected != nn { + t.Fatalf("wrong written bytes count: %v, expected: %v", nn, expected) + } + if out, err := rw.bs.GetBlob("a", "b"); err != nil { + t.Fatal(err) + } else { + assertBlobContents(t, out, blob) + } + if bx, err := rw.bs.GetBlockList("a", "b", azure.BlockListTypeCommitted); err != nil { + t.Fatal(err) + } else if expected := 5; len(bx.CommittedBlocks) != expected { + t.Fatalf("got wrong number of committed blocks: %v, expected: %v", len(bx.CommittedBlocks), expected) + } + + // Replace 2K-4K (overlaps 2 blocks from L/R) + newChunk := randomContents(1024 * 2) + offset = 1024 * 2 + blob = append(append(blob[:offset], newChunk...), blob[offset+int64(len(newChunk)):]...) + if nn, err := rw.WriteBlobAt("a", "b", offset, bytes.NewReader(newChunk)); err != nil { + t.Fatal(err) + } else if expected := int64(len(newChunk)); expected != nn { + t.Fatalf("wrong written bytes count: %v, expected: %v", nn, expected) + } + if out, err := rw.bs.GetBlob("a", "b"); err != nil { + t.Fatal(err) + } else { + assertBlobContents(t, out, blob) + } + if bx, err := rw.bs.GetBlockList("a", "b", azure.BlockListTypeCommitted); err != nil { + t.Fatal(err) + } else if expected := 6; len(bx.CommittedBlocks) != expected { + t.Fatalf("got wrong number of committed blocks: %v, expected: %v\n%v", len(bx.CommittedBlocks), expected, bx.CommittedBlocks) + } + + // Replace the entire blob + newBlob := randomContents(1024 * 30) + if nn, err := rw.WriteBlobAt("a", "b", 0, bytes.NewReader(newBlob)); err != nil { + t.Fatal(err) + } else if expected := int64(len(newBlob)); expected != nn { + t.Fatalf("wrong written bytes count: %v, expected: %v", nn, expected) + } + if out, err := rw.bs.GetBlob("a", "b"); err != nil { + t.Fatal(err) + } else { + assertBlobContents(t, out, newBlob) + } + if bx, err := rw.bs.GetBlockList("a", "b", azure.BlockListTypeCommitted); err != nil { + t.Fatal(err) + } else if expected := 10; len(bx.CommittedBlocks) != expected { + t.Fatalf("got wrong number of committed blocks: %v, expected: %v\n%v", len(bx.CommittedBlocks), expected, bx.CommittedBlocks) + } else if expected, size := int64(1024*30), getBlobSize(bx); size != expected { + t.Fatalf("committed block size does not indicate blob size") + } +} + +func Test_getBlobSize(t *testing.T) { + // with some committed blocks + if expected, size := int64(151), getBlobSize(azure.BlockListResponse{ + CommittedBlocks: []azure.BlockResponse{ + {"A", 100}, + {"B", 50}, + {"C", 1}, + }, + UncommittedBlocks: []azure.BlockResponse{ + {"D", 200}, + }}); expected != size { + t.Fatalf("wrong blob size: %v, expected: %v", size, expected) + } + + // with no committed blocks + if expected, size := int64(0), getBlobSize(azure.BlockListResponse{ + UncommittedBlocks: []azure.BlockResponse{ + {"A", 100}, + {"B", 50}, + {"C", 1}, + {"D", 200}, + }}); expected != size { + t.Fatalf("wrong blob size: %v, expected: %v", size, expected) + } +} + +func assertBlobContents(t *testing.T, r io.Reader, expected []byte) { + out, err := ioutil.ReadAll(r) + if err != nil { + t.Fatal(err) + } + + if !reflect.DeepEqual(out, expected) { + t.Fatalf("wrong blob contents. size: %v, expected: %v", len(out), len(expected)) + } +} + +func randomContents(length int64) []byte { + b := make([]byte, length) + for i := range b { + b[i] = byte(rand.Intn(2 << 8)) + } + return b +} diff --git a/storagedriver/azure/zerofillwriter.go b/storagedriver/azure/zerofillwriter.go new file mode 100644 index 000000000..095489d22 --- /dev/null +++ b/storagedriver/azure/zerofillwriter.go @@ -0,0 +1,49 @@ +package azure + +import ( + "bytes" + "io" +) + +type blockBlobWriter interface { + GetSize(container, blob string) (int64, error) + WriteBlobAt(container, blob string, offset int64, chunk io.Reader) (int64, error) +} + +// zeroFillWriter enables writing to an offset outside a block blob's size +// by offering the chunk to the underlying writer as a contiguous data with +// the gap in between filled with NUL (zero) bytes. +type zeroFillWriter struct { + blockBlobWriter +} + +func newZeroFillWriter(b blockBlobWriter) zeroFillWriter { + w := zeroFillWriter{} + w.blockBlobWriter = b + return w +} + +// Write writes the given chunk to the specified existing blob even though +// offset is out of blob's size. The gaps are filled with zeros. Returned +// written number count does not include zeros written. +func (z *zeroFillWriter) Write(container, blob string, offset int64, chunk io.Reader) (int64, error) { + size, err := z.blockBlobWriter.GetSize(container, blob) + if err != nil { + return 0, err + } + + var reader io.Reader + var zeroPadding int64 + if offset <= size { + reader = chunk + } else { + zeroPadding = offset - size + offset = size // adjust offset to be the append index + zeros := bytes.NewReader(make([]byte, zeroPadding)) + reader = io.MultiReader(zeros, chunk) + } + + nn, err := z.blockBlobWriter.WriteBlobAt(container, blob, offset, reader) + nn -= zeroPadding + return nn, err +} diff --git a/storagedriver/azure/zerofillwriter_test.go b/storagedriver/azure/zerofillwriter_test.go new file mode 100644 index 000000000..49361791a --- /dev/null +++ b/storagedriver/azure/zerofillwriter_test.go @@ -0,0 +1,126 @@ +package azure + +import ( + "bytes" + "testing" +) + +func Test_zeroFillWrite_AppendNoGap(t *testing.T) { + s := NewStorageSimulator() + bw := newRandomBlobWriter(&s, 1024*1) + zw := newZeroFillWriter(&bw) + if err := s.CreateBlockBlob("a", "b"); err != nil { + t.Fatal(err) + } + + firstChunk := randomContents(1024*3 + 512) + if nn, err := zw.Write("a", "b", 0, bytes.NewReader(firstChunk)); err != nil { + t.Fatal(err) + } else if expected := int64(len(firstChunk)); expected != nn { + t.Fatalf("wrong written bytes count: %v, expected: %v", nn, expected) + } + if out, err := s.GetBlob("a", "b"); err != nil { + t.Fatal(err) + } else { + assertBlobContents(t, out, firstChunk) + } + + secondChunk := randomContents(256) + if nn, err := zw.Write("a", "b", int64(len(firstChunk)), bytes.NewReader(secondChunk)); err != nil { + t.Fatal(err) + } else if expected := int64(len(secondChunk)); expected != nn { + t.Fatalf("wrong written bytes count: %v, expected: %v", nn, expected) + } + if out, err := s.GetBlob("a", "b"); err != nil { + t.Fatal(err) + } else { + assertBlobContents(t, out, append(firstChunk, secondChunk...)) + } + +} + +func Test_zeroFillWrite_StartWithGap(t *testing.T) { + s := NewStorageSimulator() + bw := newRandomBlobWriter(&s, 1024*2) + zw := newZeroFillWriter(&bw) + if err := s.CreateBlockBlob("a", "b"); err != nil { + t.Fatal(err) + } + + chunk := randomContents(1024 * 5) + padding := int64(1024*2 + 256) + if nn, err := zw.Write("a", "b", padding, bytes.NewReader(chunk)); err != nil { + t.Fatal(err) + } else if expected := int64(len(chunk)); expected != nn { + t.Fatalf("wrong written bytes count: %v, expected: %v", nn, expected) + } + if out, err := s.GetBlob("a", "b"); err != nil { + t.Fatal(err) + } else { + assertBlobContents(t, out, append(make([]byte, padding), chunk...)) + } +} + +func Test_zeroFillWrite_AppendWithGap(t *testing.T) { + s := NewStorageSimulator() + bw := newRandomBlobWriter(&s, 1024*2) + zw := newZeroFillWriter(&bw) + if err := s.CreateBlockBlob("a", "b"); err != nil { + t.Fatal(err) + } + + firstChunk := randomContents(1024*3 + 512) + if _, err := zw.Write("a", "b", 0, bytes.NewReader(firstChunk)); err != nil { + t.Fatal(err) + } + if out, err := s.GetBlob("a", "b"); err != nil { + t.Fatal(err) + } else { + assertBlobContents(t, out, firstChunk) + } + + secondChunk := randomContents(256) + padding := int64(1024 * 4) + if nn, err := zw.Write("a", "b", int64(len(firstChunk))+padding, bytes.NewReader(secondChunk)); err != nil { + t.Fatal(err) + } else if expected := int64(len(secondChunk)); expected != nn { + t.Fatalf("wrong written bytes count: %v, expected: %v", nn, expected) + } + if out, err := s.GetBlob("a", "b"); err != nil { + t.Fatal(err) + } else { + assertBlobContents(t, out, append(firstChunk, append(make([]byte, padding), secondChunk...)...)) + } +} + +func Test_zeroFillWrite_LiesWithinSize(t *testing.T) { + s := NewStorageSimulator() + bw := newRandomBlobWriter(&s, 1024*2) + zw := newZeroFillWriter(&bw) + if err := s.CreateBlockBlob("a", "b"); err != nil { + t.Fatal(err) + } + + firstChunk := randomContents(1024 * 3) + if _, err := zw.Write("a", "b", 0, bytes.NewReader(firstChunk)); err != nil { + t.Fatal(err) + } + if out, err := s.GetBlob("a", "b"); err != nil { + t.Fatal(err) + } else { + assertBlobContents(t, out, firstChunk) + } + + // in this case, zerofill won't be used + secondChunk := randomContents(256) + if nn, err := zw.Write("a", "b", 0, bytes.NewReader(secondChunk)); err != nil { + t.Fatal(err) + } else if expected := int64(len(secondChunk)); expected != nn { + t.Fatalf("wrong written bytes count: %v, expected: %v", nn, expected) + } + if out, err := s.GetBlob("a", "b"); err != nil { + t.Fatal(err) + } else { + assertBlobContents(t, out, append(secondChunk, firstChunk[len(secondChunk):]...)) + } +}