package storage import ( "bufio" "fmt" "io" "os" "time" "github.com/docker/distribution/storagedriver" ) // remoteFileReader provides a read seeker interface to files stored in // storagedriver. Used to implement part of layer interface and will be used // to implement read side of LayerUpload. type fileReader struct { driver storagedriver.StorageDriver // identifying fields path string size int64 // size is the total layer size, must be set. modtime time.Time // mutable fields rc io.ReadCloser // remote read closer brd *bufio.Reader // internal buffered io offset int64 // offset is the current read offset err error // terminal error, if set, reader is closed } func newFileReader(driver storagedriver.StorageDriver, path string) (*fileReader, error) { // Grab the size of the layer file, ensuring existence. fi, err := driver.Stat(path) if err != nil { return nil, err } if fi.IsDir() { return nil, fmt.Errorf("cannot read a directory") } return &fileReader{ driver: driver, path: path, size: fi.Size(), modtime: fi.ModTime(), }, nil } func (fr *fileReader) Read(p []byte) (n int, err error) { if fr.err != nil { return 0, fr.err } rd, err := fr.reader() if err != nil { return 0, err } n, err = rd.Read(p) fr.offset += int64(n) // Simulate io.EOR error if we reach filesize. if err == nil && fr.offset >= fr.size { err = io.EOF } return n, err } func (fr *fileReader) Seek(offset int64, whence int) (int64, error) { if fr.err != nil { return 0, fr.err } var err error newOffset := fr.offset switch whence { case os.SEEK_CUR: newOffset += int64(offset) case os.SEEK_END: newOffset = fr.size + int64(offset) case os.SEEK_SET: newOffset = int64(offset) } if newOffset < 0 { err = fmt.Errorf("cannot seek to negative position") } else if newOffset > fr.size { err = fmt.Errorf("cannot seek passed end of file") } else { if fr.offset != newOffset { fr.reset() } // No problems, set the offset. fr.offset = newOffset } return fr.offset, err } // Close the layer. Should be called when the resource is no longer needed. func (fr *fileReader) Close() error { if fr.err != nil { return fr.err } fr.err = ErrLayerClosed // close and release reader chain if fr.rc != nil { fr.rc.Close() } fr.rc = nil fr.brd = nil return fr.err } // reader prepares the current reader at the lrs offset, ensuring its buffered // and ready to go. func (fr *fileReader) reader() (io.Reader, error) { if fr.err != nil { return nil, fr.err } if fr.rc != nil { return fr.brd, nil } // If we don't have a reader, open one up. rc, err := fr.driver.ReadStream(fr.path, fr.offset) if err != nil { return nil, err } fr.rc = rc if fr.brd == nil { // TODO(stevvooe): Set an optimal buffer size here. We'll have to // understand the latency characteristics of the underlying network to // set this correctly, so we may want to leave it to the driver. For // out of process drivers, we'll have to optimize this buffer size for // local communication. fr.brd = bufio.NewReader(fr.rc) } else { fr.brd.Reset(fr.rc) } return fr.brd, nil } // resetReader resets the reader, forcing the read method to open up a new // connection and rebuild the buffered reader. This should be called when the // offset and the reader will become out of sync, such as during a seek // operation. func (fr *fileReader) reset() { if fr.err != nil { return } if fr.rc != nil { fr.rc.Close() fr.rc = nil } }