azureblob: fix data corruption bug #7590
It was reported that rclone copy occasionally uploaded corrupted data
to azure blob.
This turned out to be a race condition updating the block count which
caused blocks to be duplicated.
This bug was introduced in this commit in v1.64.0 and will be fixed in v1.65.2
0427177857
azureblob: implement OpenChunkWriter and multi-thread uploads #7056
This race only seems to happen if `--checksum` is used but can happen otherwise.
Unfortunately Azure blob does not check the MD5 that we send them so
despite sending incorrect data this corruption is not detected. The
corruption is detected when rclone tries to download the file, so
attempting to copy the files back to local disk will result in errors
such as:
ERROR : file.pokosuf5.partial: corrupted on transfer: md5 hash differ "XXX" vs "YYY"
This adds a check to test the blocklist we upload is as we expected
which would have caught the problem had it been in place earlier.
This commit is contained in:
parent
b3a1f66759
commit
03295bbc3c
2 changed files with 25 additions and 43 deletions
|
@ -8,6 +8,7 @@ import (
|
||||||
"context"
|
"context"
|
||||||
"crypto/md5"
|
"crypto/md5"
|
||||||
"encoding/base64"
|
"encoding/base64"
|
||||||
|
"encoding/binary"
|
||||||
"encoding/hex"
|
"encoding/hex"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"errors"
|
"errors"
|
||||||
|
@ -1985,21 +1986,9 @@ func (rs *readSeekCloser) Close() error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// increment the array as LSB binary
|
|
||||||
func increment(xs *[8]byte) {
|
|
||||||
for i, digit := range xs {
|
|
||||||
newDigit := digit + 1
|
|
||||||
xs[i] = newDigit
|
|
||||||
if newDigit >= digit {
|
|
||||||
// exit if no carry
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// record chunk number and id for Close
|
// record chunk number and id for Close
|
||||||
type azBlock struct {
|
type azBlock struct {
|
||||||
chunkNumber int
|
chunkNumber uint64
|
||||||
id string
|
id string
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2011,7 +2000,6 @@ type azChunkWriter struct {
|
||||||
ui uploadInfo
|
ui uploadInfo
|
||||||
blocksMu sync.Mutex // protects the below
|
blocksMu sync.Mutex // protects the below
|
||||||
blocks []azBlock // list of blocks for finalize
|
blocks []azBlock // list of blocks for finalize
|
||||||
binaryBlockID [8]byte // block counter as LSB first 8 bytes
|
|
||||||
o *Object
|
o *Object
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2100,13 +2088,14 @@ func (w *azChunkWriter) WriteChunk(ctx context.Context, chunkNumber int, reader
|
||||||
transactionalMD5 := md5sum[:]
|
transactionalMD5 := md5sum[:]
|
||||||
|
|
||||||
// increment the blockID and save the blocks for finalize
|
// increment the blockID and save the blocks for finalize
|
||||||
increment(&w.binaryBlockID)
|
var binaryBlockID [8]byte // block counter as LSB first 8 bytes
|
||||||
blockID := base64.StdEncoding.EncodeToString(w.binaryBlockID[:])
|
binary.LittleEndian.PutUint64(binaryBlockID[:], uint64(chunkNumber))
|
||||||
|
blockID := base64.StdEncoding.EncodeToString(binaryBlockID[:])
|
||||||
|
|
||||||
// Save the blockID for the commit
|
// Save the blockID for the commit
|
||||||
w.blocksMu.Lock()
|
w.blocksMu.Lock()
|
||||||
w.blocks = append(w.blocks, azBlock{
|
w.blocks = append(w.blocks, azBlock{
|
||||||
chunkNumber: chunkNumber,
|
chunkNumber: uint64(chunkNumber),
|
||||||
id: blockID,
|
id: blockID,
|
||||||
})
|
})
|
||||||
w.blocksMu.Unlock()
|
w.blocksMu.Unlock()
|
||||||
|
@ -2171,9 +2160,20 @@ func (w *azChunkWriter) Close(ctx context.Context) (err error) {
|
||||||
return w.blocks[i].chunkNumber < w.blocks[j].chunkNumber
|
return w.blocks[i].chunkNumber < w.blocks[j].chunkNumber
|
||||||
})
|
})
|
||||||
|
|
||||||
// Create a list of block IDs
|
// Create and check a list of block IDs
|
||||||
blockIDs := make([]string, len(w.blocks))
|
blockIDs := make([]string, len(w.blocks))
|
||||||
for i := range w.blocks {
|
for i := range w.blocks {
|
||||||
|
if w.blocks[i].chunkNumber != uint64(i) {
|
||||||
|
return fmt.Errorf("internal error: expecting chunkNumber %d but got %d", i, w.blocks[i].chunkNumber)
|
||||||
|
}
|
||||||
|
chunkBytes, err := base64.StdEncoding.DecodeString(w.blocks[i].id)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("internal error: bad block ID: %w", err)
|
||||||
|
}
|
||||||
|
chunkNumber := binary.LittleEndian.Uint64(chunkBytes)
|
||||||
|
if w.blocks[i].chunkNumber != chunkNumber {
|
||||||
|
return fmt.Errorf("internal error: expecting decoded chunkNumber %d but got %d", w.blocks[i].chunkNumber, chunkNumber)
|
||||||
|
}
|
||||||
blockIDs[i] = w.blocks[i].id
|
blockIDs[i] = w.blocks[i].id
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -17,21 +17,3 @@ func (f *Fs) InternalTest(t *testing.T) {
|
||||||
enabled = f.Features().GetTier
|
enabled = f.Features().GetTier
|
||||||
assert.True(t, enabled)
|
assert.True(t, enabled)
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestIncrement(t *testing.T) {
|
|
||||||
for _, test := range []struct {
|
|
||||||
in [8]byte
|
|
||||||
want [8]byte
|
|
||||||
}{
|
|
||||||
{[8]byte{0, 0, 0, 0}, [8]byte{1, 0, 0, 0}},
|
|
||||||
{[8]byte{0xFE, 0, 0, 0}, [8]byte{0xFF, 0, 0, 0}},
|
|
||||||
{[8]byte{0xFF, 0, 0, 0}, [8]byte{0, 1, 0, 0}},
|
|
||||||
{[8]byte{0, 1, 0, 0}, [8]byte{1, 1, 0, 0}},
|
|
||||||
{[8]byte{0xFF, 0xFF, 0xFF, 0xFE}, [8]byte{0, 0, 0, 0xFF}},
|
|
||||||
{[8]byte{0xFF, 0xFF, 0xFF, 0xFF}, [8]byte{0, 0, 0, 0, 1}},
|
|
||||||
{[8]byte{0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF}, [8]byte{0, 0, 0, 0, 0, 0, 0}},
|
|
||||||
} {
|
|
||||||
increment(&test.in)
|
|
||||||
assert.Equal(t, test.want, test.in)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
Loading…
Reference in a new issue