Merge pull request #1636 from kurin/pack-header
Refactor the eager-header reads for readability.
This commit is contained in:
commit
dfd37afee2
2 changed files with 119 additions and 72 deletions
|
@ -170,26 +170,73 @@ func (p *Packer) String() string {
|
||||||
return fmt.Sprintf("<Packer %d blobs, %d bytes>", len(p.blobs), p.bytes)
|
return fmt.Sprintf("<Packer %d blobs, %d bytes>", len(p.blobs), p.bytes)
|
||||||
}
|
}
|
||||||
|
|
||||||
const maxHeaderSize = 16 * 1024 * 1024
|
var (
|
||||||
|
// size of the header-length field at the end of the file
|
||||||
|
headerLengthSize = binary.Size(uint32(0))
|
||||||
|
// we require at least one entry in the header, and one blob for a pack file
|
||||||
|
minFileSize = entrySize + crypto.Extension + uint(headerLengthSize)
|
||||||
|
)
|
||||||
|
|
||||||
// size of the header-length field at the end of the file
|
const (
|
||||||
var headerLengthSize = binary.Size(uint32(0))
|
maxHeaderSize = 16 * 1024 * 1024
|
||||||
|
// number of header enries to download as part of header-length request
|
||||||
|
eagerEntries = 15
|
||||||
|
)
|
||||||
|
|
||||||
// we require at least one entry in the header, and one blob for a pack file
|
// readRecords reads up to max records from the underlying ReaderAt, returning
|
||||||
var minFileSize = entrySize + crypto.Extension + uint(headerLengthSize)
|
// the raw header, the total number of records in the header, and any error.
|
||||||
|
// If the header contains fewer than max entries, the header is truncated to
|
||||||
|
// the appropriate size.
|
||||||
|
func readRecords(rd io.ReaderAt, size int64, max int) ([]byte, int, error) {
|
||||||
|
var bufsize int
|
||||||
|
bufsize += max * int(entrySize)
|
||||||
|
bufsize += crypto.Extension
|
||||||
|
bufsize += headerLengthSize
|
||||||
|
|
||||||
// number of header enries to download as part of header-length request
|
if bufsize > int(size) {
|
||||||
var eagerEntries = uint(15)
|
bufsize = int(size)
|
||||||
|
}
|
||||||
|
|
||||||
|
b := make([]byte, bufsize)
|
||||||
|
off := size - int64(bufsize)
|
||||||
|
if _, err := rd.ReadAt(b, off); err != nil {
|
||||||
|
return nil, 0, err
|
||||||
|
}
|
||||||
|
|
||||||
|
hlen := binary.LittleEndian.Uint32(b[len(b)-headerLengthSize:])
|
||||||
|
b = b[:len(b)-headerLengthSize]
|
||||||
|
debug.Log("header length: %v", hlen)
|
||||||
|
|
||||||
|
var err error
|
||||||
|
switch {
|
||||||
|
case hlen == 0:
|
||||||
|
err = InvalidFileError{Message: "header length is zero"}
|
||||||
|
case hlen < crypto.Extension:
|
||||||
|
err = InvalidFileError{Message: "header length is too small"}
|
||||||
|
case (hlen-crypto.Extension)%uint32(entrySize) != 0:
|
||||||
|
err = InvalidFileError{Message: "header length is invalid"}
|
||||||
|
case int64(hlen) > size-int64(headerLengthSize):
|
||||||
|
err = InvalidFileError{Message: "header is larger than file"}
|
||||||
|
case int64(hlen) > maxHeaderSize:
|
||||||
|
err = InvalidFileError{Message: "header is larger than maxHeaderSize"}
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
return nil, 0, errors.Wrap(err, "readHeader")
|
||||||
|
}
|
||||||
|
|
||||||
|
total := (int(hlen) - crypto.Extension) / int(entrySize)
|
||||||
|
if total < max {
|
||||||
|
// truncate to the beginning of the pack header
|
||||||
|
b = b[len(b)-int(hlen):]
|
||||||
|
}
|
||||||
|
|
||||||
|
return b, total, nil
|
||||||
|
}
|
||||||
|
|
||||||
// readHeader reads the header at the end of rd. size is the length of the
|
// readHeader reads the header at the end of rd. size is the length of the
|
||||||
// whole data accessible in rd.
|
// whole data accessible in rd.
|
||||||
func readHeader(rd io.ReaderAt, size int64) ([]byte, error) {
|
func readHeader(rd io.ReaderAt, size int64) ([]byte, error) {
|
||||||
debug.Log("size: %v", size)
|
debug.Log("size: %v", size)
|
||||||
if size == 0 {
|
|
||||||
err := InvalidFileError{Message: "file is empty"}
|
|
||||||
return nil, errors.Wrap(err, "readHeader")
|
|
||||||
}
|
|
||||||
|
|
||||||
if size < int64(minFileSize) {
|
if size < int64(minFileSize) {
|
||||||
err := InvalidFileError{Message: "file is too small"}
|
err := InvalidFileError{Message: "file is too small"}
|
||||||
return nil, errors.Wrap(err, "readHeader")
|
return nil, errors.Wrap(err, "readHeader")
|
||||||
|
@ -199,69 +246,19 @@ func readHeader(rd io.ReaderAt, size int64) ([]byte, error) {
|
||||||
// eagerly download eagerEntries header entries as part of header-length request.
|
// eagerly download eagerEntries header entries as part of header-length request.
|
||||||
// only make second request if actual number of entries is greater than eagerEntries
|
// only make second request if actual number of entries is greater than eagerEntries
|
||||||
|
|
||||||
eagerHl := uint32((eagerEntries * entrySize) + crypto.Extension)
|
b, c, err := readRecords(rd, size, eagerEntries)
|
||||||
if int64(eagerHl)+int64(headerLengthSize) > size {
|
|
||||||
eagerHl = uint32(size) - uint32(headerLengthSize)
|
|
||||||
}
|
|
||||||
eagerBuf := make([]byte, eagerHl+uint32(headerLengthSize))
|
|
||||||
|
|
||||||
n, err := rd.ReadAt(eagerBuf, size-int64(len(eagerBuf)))
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
if n != len(eagerBuf) {
|
if c <= eagerEntries {
|
||||||
return nil, errors.New("not enough bytes read")
|
// eager read sufficed, return what we got
|
||||||
|
return b, nil
|
||||||
}
|
}
|
||||||
|
b, _, err = readRecords(rd, size, c)
|
||||||
hl := binary.LittleEndian.Uint32(eagerBuf[eagerHl:])
|
if err != nil {
|
||||||
debug.Log("header length: %v", size)
|
return nil, err
|
||||||
|
|
||||||
if hl == 0 {
|
|
||||||
err := InvalidFileError{Message: "header length is zero"}
|
|
||||||
return nil, errors.Wrap(err, "readHeader")
|
|
||||||
}
|
}
|
||||||
|
return b, nil
|
||||||
if hl < crypto.Extension {
|
|
||||||
err := InvalidFileError{Message: "header length is too small"}
|
|
||||||
return nil, errors.Wrap(err, "readHeader")
|
|
||||||
}
|
|
||||||
|
|
||||||
if (hl-crypto.Extension)%uint32(entrySize) != 0 {
|
|
||||||
err := InvalidFileError{Message: "header length is invalid"}
|
|
||||||
return nil, errors.Wrap(err, "readHeader")
|
|
||||||
}
|
|
||||||
|
|
||||||
if int64(hl) > size-int64(headerLengthSize) {
|
|
||||||
err := InvalidFileError{Message: "header is larger than file"}
|
|
||||||
return nil, errors.Wrap(err, "readHeader")
|
|
||||||
}
|
|
||||||
|
|
||||||
if int64(hl) > maxHeaderSize {
|
|
||||||
err := InvalidFileError{Message: "header is larger than maxHeaderSize"}
|
|
||||||
return nil, errors.Wrap(err, "readHeader")
|
|
||||||
}
|
|
||||||
|
|
||||||
eagerBuf = eagerBuf[:eagerHl]
|
|
||||||
|
|
||||||
var buf []byte
|
|
||||||
if hl <= eagerHl {
|
|
||||||
// already have all header bytes. yay.
|
|
||||||
buf = eagerBuf[eagerHl-hl:]
|
|
||||||
} else {
|
|
||||||
// need more header bytes
|
|
||||||
buf = make([]byte, hl)
|
|
||||||
missingHl := hl - eagerHl
|
|
||||||
n, err := rd.ReadAt(buf[:missingHl], size-int64(hl)-int64(headerLengthSize))
|
|
||||||
if err != nil {
|
|
||||||
return nil, errors.Wrap(err, "ReadAt")
|
|
||||||
}
|
|
||||||
if uint32(n) != missingHl {
|
|
||||||
return nil, errors.New("not enough bytes read")
|
|
||||||
}
|
|
||||||
copy(buf[hl-eagerHl:], eagerBuf)
|
|
||||||
}
|
|
||||||
|
|
||||||
return buf, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// InvalidFileError is return when a file is found that is not a pack file.
|
// InvalidFileError is return when a file is found that is not a pack file.
|
||||||
|
|
|
@ -22,8 +22,8 @@ func (rd *countingReaderAt) ReadAt(p []byte, off int64) (n int, err error) {
|
||||||
|
|
||||||
func TestReadHeaderEagerLoad(t *testing.T) {
|
func TestReadHeaderEagerLoad(t *testing.T) {
|
||||||
|
|
||||||
testReadHeader := func(dataSize int, entryCount uint, expectedReadInvocationCount int) {
|
testReadHeader := func(dataSize, entryCount, expectedReadInvocationCount int) {
|
||||||
expectedHeader := rtest.Random(0, int(entryCount*entrySize)+crypto.Extension)
|
expectedHeader := rtest.Random(0, entryCount*int(entrySize)+crypto.Extension)
|
||||||
|
|
||||||
buf := &bytes.Buffer{}
|
buf := &bytes.Buffer{}
|
||||||
buf.Write(rtest.Random(0, dataSize)) // pack blobs data
|
buf.Write(rtest.Random(0, dataSize)) // pack blobs data
|
||||||
|
@ -58,3 +58,53 @@ func TestReadHeaderEagerLoad(t *testing.T) {
|
||||||
testReadHeader(dataSize+3, 1, 1)
|
testReadHeader(dataSize+3, 1, 1)
|
||||||
testReadHeader(dataSize+4, 1, 1)
|
testReadHeader(dataSize+4, 1, 1)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestReadRecords(t *testing.T) {
|
||||||
|
testReadRecords := func(dataSize, entryCount, totalRecords int) {
|
||||||
|
totalHeader := rtest.Random(0, totalRecords*int(entrySize)+crypto.Extension)
|
||||||
|
off := len(totalHeader) - (entryCount*int(entrySize) + crypto.Extension)
|
||||||
|
if off < 0 {
|
||||||
|
off = 0
|
||||||
|
}
|
||||||
|
expectedHeader := totalHeader[off:]
|
||||||
|
|
||||||
|
buf := &bytes.Buffer{}
|
||||||
|
buf.Write(rtest.Random(0, dataSize)) // pack blobs data
|
||||||
|
buf.Write(totalHeader) // pack header
|
||||||
|
binary.Write(buf, binary.LittleEndian, uint32(len(totalHeader))) // pack header length
|
||||||
|
|
||||||
|
rd := bytes.NewReader(buf.Bytes())
|
||||||
|
|
||||||
|
header, count, err := readRecords(rd, int64(rd.Len()), entryCount)
|
||||||
|
rtest.OK(t, err)
|
||||||
|
rtest.Equals(t, expectedHeader, header)
|
||||||
|
rtest.Equals(t, totalRecords, count)
|
||||||
|
}
|
||||||
|
|
||||||
|
// basic
|
||||||
|
testReadRecords(100, 1, 1)
|
||||||
|
testReadRecords(100, 0, 1)
|
||||||
|
testReadRecords(100, 1, 0)
|
||||||
|
|
||||||
|
// header entries ~ eager entries
|
||||||
|
testReadRecords(100, eagerEntries, eagerEntries-1)
|
||||||
|
testReadRecords(100, eagerEntries, eagerEntries)
|
||||||
|
testReadRecords(100, eagerEntries, eagerEntries+1)
|
||||||
|
|
||||||
|
// file size == eager header load size
|
||||||
|
eagerLoadSize := int((eagerEntries * entrySize) + crypto.Extension)
|
||||||
|
headerSize := int(1*entrySize) + crypto.Extension
|
||||||
|
dataSize := eagerLoadSize - headerSize - binary.Size(uint32(0))
|
||||||
|
testReadRecords(dataSize-1, 1, 1)
|
||||||
|
testReadRecords(dataSize, 1, 1)
|
||||||
|
testReadRecords(dataSize+1, 1, 1)
|
||||||
|
testReadRecords(dataSize+2, 1, 1)
|
||||||
|
testReadRecords(dataSize+3, 1, 1)
|
||||||
|
testReadRecords(dataSize+4, 1, 1)
|
||||||
|
|
||||||
|
for i := 0; i < 2; i++ {
|
||||||
|
for j := 0; j < 2; j++ {
|
||||||
|
testReadRecords(dataSize, i, j)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in a new issue