diff --git a/registry/client/transport/http_reader.go b/registry/client/transport/http_reader.go index b27b6c23..22b0b9d6 100644 --- a/registry/client/transport/http_reader.go +++ b/registry/client/transport/http_reader.go @@ -1,12 +1,22 @@ package transport import ( - "bufio" "errors" "fmt" "io" "net/http" "os" + "regexp" + "strconv" +) + +var ( + contentRangeRegexp = regexp.MustCompile(`bytes ([0-9]+)-([0-9]+)/([0-9]+|\\*)`) + + // ErrWrongCodeForByteRange is returned if the client sends a request + // with a Range header but the server returns a 2xx or 3xx code other + // than 206 Partial Content. + ErrWrongCodeForByteRange = errors.New("expected HTTP 206 from byte range request") ) // ReadSeekCloser combines io.ReadSeeker with io.Closer. @@ -40,8 +50,6 @@ type httpReadSeeker struct { // rc is the remote read closer. rc io.ReadCloser - // brd is a buffer for internal buffered io. - brd *bufio.Reader // readerOffset tracks the offset as of the last read. readerOffset int64 // seekOffset allows Seek to override the offset. Seek changes @@ -79,11 +87,6 @@ func (hrs *httpReadSeeker) Read(p []byte) (n int, err error) { hrs.seekOffset += int64(n) hrs.readerOffset += int64(n) - // Simulate io.EOF error if we reach filesize. - if err == nil && hrs.size >= 0 && hrs.readerOffset >= hrs.size { - err = io.EOF - } - return n, err } @@ -92,8 +95,18 @@ func (hrs *httpReadSeeker) Seek(offset int64, whence int) (int64, error) { return 0, hrs.err } + lastReaderOffset := hrs.readerOffset + + if whence == os.SEEK_SET && hrs.rc == nil { + // If no request has been made yet, and we are seeking to an + // absolute position, set the read offset as well to avoid an + // unnecessary request. + hrs.readerOffset = offset + } + _, err := hrs.reader() if err != nil { + hrs.readerOffset = lastReaderOffset return 0, err } @@ -101,14 +114,14 @@ func (hrs *httpReadSeeker) Seek(offset int64, whence int) (int64, error) { switch whence { case os.SEEK_CUR: - newOffset += int64(offset) + newOffset += offset case os.SEEK_END: if hrs.size < 0 { return 0, errors.New("content length not known") } - newOffset = hrs.size + int64(offset) + newOffset = hrs.size + offset case os.SEEK_SET: - newOffset = int64(offset) + newOffset = offset } if newOffset < 0 { @@ -131,7 +144,6 @@ func (hrs *httpReadSeeker) Close() error { } hrs.rc = nil - hrs.brd = nil hrs.err = errors.New("httpLayer: closed") @@ -154,7 +166,7 @@ func (hrs *httpReadSeeker) reader() (io.Reader, error) { } if hrs.rc != nil { - return hrs.brd, nil + return hrs.rc, nil } req, err := http.NewRequest("GET", hrs.url, nil) @@ -163,10 +175,8 @@ func (hrs *httpReadSeeker) reader() (io.Reader, error) { } if hrs.readerOffset > 0 { - // TODO(stevvooe): Get this working correctly. - // If we are at different offset, issue a range request from there. - req.Header.Add("Range", "1-") + req.Header.Add("Range", fmt.Sprintf("bytes=%d-", hrs.readerOffset)) // TODO: get context in here // context.GetLogger(hrs.context).Infof("Range: %s", req.Header.Get("Range")) } @@ -179,12 +189,55 @@ func (hrs *httpReadSeeker) reader() (io.Reader, error) { // Normally would use client.SuccessStatus, but that would be a cyclic // import if resp.StatusCode >= 200 && resp.StatusCode <= 399 { - hrs.rc = resp.Body - if resp.StatusCode == http.StatusOK { + if hrs.readerOffset > 0 { + if resp.StatusCode != http.StatusPartialContent { + return nil, ErrWrongCodeForByteRange + } + + contentRange := resp.Header.Get("Content-Range") + if contentRange == "" { + return nil, errors.New("no Content-Range header found in HTTP 206 response") + } + + submatches := contentRangeRegexp.FindStringSubmatch(contentRange) + if len(submatches) < 4 { + return nil, fmt.Errorf("could not parse Content-Range header: %s", contentRange) + } + + startByte, err := strconv.ParseUint(submatches[1], 10, 64) + if err != nil { + return nil, fmt.Errorf("could not parse start of range in Content-Range header: %s", contentRange) + } + + if startByte != uint64(hrs.readerOffset) { + return nil, fmt.Errorf("received Content-Range starting at offset %d instead of requested %d", startByte, hrs.readerOffset) + } + + endByte, err := strconv.ParseUint(submatches[2], 10, 64) + if err != nil { + return nil, fmt.Errorf("could not parse end of range in Content-Range header: %s", contentRange) + } + + if submatches[3] == "*" { + hrs.size = -1 + } else { + size, err := strconv.ParseUint(submatches[3], 10, 64) + if err != nil { + return nil, fmt.Errorf("could not parse total size in Content-Range header: %s", contentRange) + } + + if endByte+1 != size { + return nil, fmt.Errorf("range in Content-Range stops before the end of the content: %s", contentRange) + } + + hrs.size = int64(size) + } + } else if resp.StatusCode == http.StatusOK { hrs.size = resp.ContentLength } else { hrs.size = -1 } + hrs.rc = resp.Body } else { defer resp.Body.Close() if hrs.errorHandler != nil { @@ -193,11 +246,5 @@ func (hrs *httpReadSeeker) reader() (io.Reader, error) { return nil, fmt.Errorf("unexpected status resolving reader: %v", resp.Status) } - if hrs.brd == nil { - hrs.brd = bufio.NewReader(hrs.rc) - } else { - hrs.brd.Reset(hrs.rc) - } - - return hrs.brd, nil + return hrs.rc, nil }