Update storage package to use StorageDriver.Stat
This change updates the backend storage package that consumes StorageDriver to use the new Stat call, over CurrentSize. It also makes minor updates for using WriteStream and ReadStream.
This commit is contained in:
parent
2ebc373d91
commit
70ab06b864
5 changed files with 37 additions and 33 deletions
|
@ -5,6 +5,7 @@ import (
|
|||
"fmt"
|
||||
"io"
|
||||
"os"
|
||||
"time"
|
||||
|
||||
"github.com/docker/docker-registry/storagedriver"
|
||||
)
|
||||
|
@ -18,6 +19,7 @@ type fileReader struct {
|
|||
// identifying fields
|
||||
path string
|
||||
size int64 // size is the total layer size, must be set.
|
||||
modtime time.Time
|
||||
|
||||
// mutable fields
|
||||
rc io.ReadCloser // remote read closer
|
||||
|
@ -28,16 +30,21 @@ type fileReader struct {
|
|||
|
||||
func newFileReader(driver storagedriver.StorageDriver, path string) (*fileReader, error) {
|
||||
// Grab the size of the layer file, ensuring existence.
|
||||
size, err := driver.CurrentSize(path)
|
||||
fi, err := driver.Stat(path)
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if fi.IsDir() {
|
||||
return nil, fmt.Errorf("cannot read a directory")
|
||||
}
|
||||
|
||||
return &fileReader{
|
||||
driver: driver,
|
||||
path: path,
|
||||
size: int64(size),
|
||||
size: fi.Size(),
|
||||
modtime: fi.ModTime(),
|
||||
}, nil
|
||||
}
|
||||
|
||||
|
@ -126,7 +133,7 @@ func (fr *fileReader) reader() (io.Reader, error) {
|
|||
}
|
||||
|
||||
// If we don't have a reader, open one up.
|
||||
rc, err := fr.driver.ReadStream(fr.path, uint64(fr.offset))
|
||||
rc, err := fr.driver.ReadStream(fr.path, fr.offset)
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
|
|
@ -13,7 +13,6 @@ type layerReader struct {
|
|||
|
||||
name string // repo name of this layer
|
||||
digest digest.Digest
|
||||
createdAt time.Time
|
||||
}
|
||||
|
||||
var _ Layer = &layerReader{}
|
||||
|
@ -27,5 +26,5 @@ func (lrs *layerReader) Digest() digest.Digest {
|
|||
}
|
||||
|
||||
func (lrs *layerReader) CreatedAt() time.Time {
|
||||
return lrs.createdAt
|
||||
return lrs.modtime
|
||||
}
|
||||
|
|
|
@ -1,8 +1,6 @@
|
|||
package storage
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"github.com/docker/docker-registry/digest"
|
||||
"github.com/docker/docker-registry/storagedriver"
|
||||
)
|
||||
|
@ -55,11 +53,6 @@ func (ls *layerStore) Fetch(name string, digest digest.Digest) (Layer, error) {
|
|||
fileReader: *fr,
|
||||
name: name,
|
||||
digest: digest,
|
||||
|
||||
// TODO(stevvooe): Storage backend does not support modification time
|
||||
// queries yet. Layers "never" change, so just return the zero value
|
||||
// plus a nano-second.
|
||||
createdAt: (time.Time{}).Add(time.Nanosecond),
|
||||
}, nil
|
||||
}
|
||||
|
||||
|
|
|
@ -107,9 +107,13 @@ func (luc *layerUploadController) Finish(size int64, digest digest.Digest) (Laye
|
|||
return nil, err
|
||||
}
|
||||
|
||||
if err := luc.writeLayer(fp, size, digest); err != nil {
|
||||
if nn, err := luc.writeLayer(fp, digest); err != nil {
|
||||
// Cleanup?
|
||||
return nil, err
|
||||
} else if nn != size {
|
||||
// TODO(stevvooe): Short write. Will have to delete the location and
|
||||
// report an error. This error needs to be reported to the client.
|
||||
return nil, fmt.Errorf("short write writing layer")
|
||||
}
|
||||
|
||||
// Yes! We have written some layer data. Let's make it visible. Link the
|
||||
|
@ -281,19 +285,20 @@ func (luc *layerUploadController) validateLayer(fp layerFile, size int64, dgst d
|
|||
return dgst, nil
|
||||
}
|
||||
|
||||
// writeLayer actually writes the the layer file into its final destination.
|
||||
// The layer should be validated before commencing the write.
|
||||
func (luc *layerUploadController) writeLayer(fp layerFile, size int64, dgst digest.Digest) error {
|
||||
// writeLayer actually writes the the layer file into its final destination,
|
||||
// identified by dgst. The layer should be validated before commencing the
|
||||
// write.
|
||||
func (luc *layerUploadController) writeLayer(fp layerFile, dgst digest.Digest) (nn int64, err error) {
|
||||
blobPath, err := luc.layerStore.pathMapper.path(blobPathSpec{
|
||||
digest: dgst,
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
return 0, err
|
||||
}
|
||||
|
||||
// Check for existence
|
||||
if _, err := luc.layerStore.driver.CurrentSize(blobPath); err != nil {
|
||||
if _, err := luc.layerStore.driver.Stat(blobPath); err != nil {
|
||||
// TODO(stevvooe): This check is kind of problematic and very racy.
|
||||
switch err := err.(type) {
|
||||
case storagedriver.PathNotFoundError:
|
||||
|
@ -303,22 +308,18 @@ func (luc *layerUploadController) writeLayer(fp layerFile, size int64, dgst dige
|
|||
// content addressable and we should just use this to ensure we
|
||||
// have it written. Although, we do need to verify that the
|
||||
// content that is there is the correct length.
|
||||
return err
|
||||
return 0, err
|
||||
}
|
||||
}
|
||||
|
||||
// Seek our local layer file back now.
|
||||
if _, err := fp.Seek(0, os.SEEK_SET); err != nil {
|
||||
// Cleanup?
|
||||
return err
|
||||
return 0, err
|
||||
}
|
||||
|
||||
// Okay: we can write the file to the blob store.
|
||||
if err := luc.layerStore.driver.WriteStream(blobPath, 0, uint64(size), fp); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
return luc.layerStore.driver.WriteStream(blobPath, 0, fp)
|
||||
}
|
||||
|
||||
// linkLayer links a valid, written layer blob into the registry under the
|
||||
|
|
|
@ -22,12 +22,16 @@ func (ms *manifestStore) Exists(name, tag string) (bool, error) {
|
|||
return false, err
|
||||
}
|
||||
|
||||
size, err := ms.driver.CurrentSize(p)
|
||||
fi, err := ms.driver.Stat(p)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
if size == 0 {
|
||||
if fi.IsDir() {
|
||||
return false, fmt.Errorf("unexpected directory at path: %v, name=%s tag=%s", p, name, tag)
|
||||
}
|
||||
|
||||
if fi.Size() == 0 {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in a new issue