diff --git a/registry/client/blob.go b/registry/client/blob.go deleted file mode 100644 index e7c0039c6..000000000 --- a/registry/client/blob.go +++ /dev/null @@ -1,159 +0,0 @@ -package client - -import ( - "bufio" - "bytes" - "fmt" - "io" - "io/ioutil" - "net/http" - "os" - - "github.com/docker/distribution" - "github.com/docker/distribution/context" -) - -type httpBlob struct { - *repository - - desc distribution.Descriptor - - rc io.ReadCloser // remote read closer - brd *bufio.Reader // internal buffered io - offset int64 - err error -} - -func (hb *httpBlob) Read(p []byte) (n int, err error) { - if hb.err != nil { - return 0, hb.err - } - - rd, err := hb.reader() - if err != nil { - return 0, err - } - - n, err = rd.Read(p) - hb.offset += int64(n) - - // Simulate io.EOF error if we reach filesize. - if err == nil && hb.offset >= hb.desc.Length { - err = io.EOF - } - - return n, err -} - -func (hb *httpBlob) Seek(offset int64, whence int) (int64, error) { - if hb.err != nil { - return 0, hb.err - } - - var err error - newOffset := hb.offset - - switch whence { - case os.SEEK_CUR: - newOffset += int64(offset) - case os.SEEK_END: - newOffset = hb.desc.Length + int64(offset) - case os.SEEK_SET: - newOffset = int64(offset) - } - - if newOffset < 0 { - err = fmt.Errorf("cannot seek to negative position") - } else { - if hb.offset != newOffset { - hb.reset() - } - - // No problems, set the offset. - hb.offset = newOffset - } - - return hb.offset, err -} - -func (hb *httpBlob) Close() error { - if hb.err != nil { - return hb.err - } - - // close and release reader chain - if hb.rc != nil { - hb.rc.Close() - } - - hb.rc = nil - hb.brd = nil - - hb.err = fmt.Errorf("httpBlob: closed") - - return nil -} - -func (hb *httpBlob) reset() { - if hb.err != nil { - return - } - if hb.rc != nil { - hb.rc.Close() - hb.rc = nil - } -} - -func (hb *httpBlob) reader() (io.Reader, error) { - if hb.err != nil { - return nil, hb.err - } - - if hb.rc != nil { - return hb.brd, nil - } - - // If the offset is great than or equal to size, return a empty, noop reader. - if hb.offset >= hb.desc.Length { - return ioutil.NopCloser(bytes.NewReader([]byte{})), nil - } - - blobURL, err := hb.ub.BuildBlobURL(hb.name, hb.desc.Digest) - if err != nil { - return nil, err - } - - req, err := http.NewRequest("GET", blobURL, nil) - if err != nil { - return nil, err - } - - if hb.offset > 0 { - // TODO(stevvooe): Get this working correctly. - - // If we are at different offset, issue a range request from there. - req.Header.Add("Range", fmt.Sprintf("1-")) - context.GetLogger(hb.context).Infof("Range: %s", req.Header.Get("Range")) - } - - resp, err := hb.client.Do(req) - if err != nil { - return nil, err - } - - switch { - case resp.StatusCode == 200: - hb.rc = resp.Body - default: - defer resp.Body.Close() - return nil, fmt.Errorf("unexpected status resolving reader: %v", resp.Status) - } - - if hb.brd == nil { - hb.brd = bufio.NewReader(hb.rc) - } else { - hb.brd.Reset(hb.rc) - } - - return hb.brd, nil -} diff --git a/registry/client/blob_writer.go b/registry/client/blob_writer.go index 3697ef8c6..441511676 100644 --- a/registry/client/blob_writer.go +++ b/registry/client/blob_writer.go @@ -151,7 +151,7 @@ func (hbu *httpBlobUpload) Commit(ctx context.Context, desc distribution.Descrip return hbu.repo.Blobs(ctx).Stat(ctx, desc.Digest) } -func (hbu *httpBlobUpload) Rollback(ctx context.Context) error { +func (hbu *httpBlobUpload) Cancel(ctx context.Context) error { panic("not implemented") } diff --git a/registry/client/http_reader.go b/registry/client/http_reader.go new file mode 100644 index 000000000..22f9bfbc4 --- /dev/null +++ b/registry/client/http_reader.go @@ -0,0 +1,164 @@ +package client + +import ( + "bufio" + "bytes" + "errors" + "fmt" + "io" + "io/ioutil" + "net/http" + "os" + + "github.com/docker/distribution" +) + +func NewHTTPReadSeeker(client *http.Client, url string, size int64) distribution.ReadSeekCloser { + return &httpReadSeeker{ + client: client, + url: url, + size: size, + } +} + +type httpReadSeeker struct { + client *http.Client + url string + + size int64 + + rc io.ReadCloser // remote read closer + brd *bufio.Reader // internal buffered io + offset int64 + err error +} + +func (hrs *httpReadSeeker) Read(p []byte) (n int, err error) { + if hrs.err != nil { + return 0, hrs.err + } + + rd, err := hrs.reader() + if err != nil { + return 0, err + } + + n, err = rd.Read(p) + hrs.offset += int64(n) + + // Simulate io.EOF error if we reach filesize. + if err == nil && hrs.offset >= hrs.size { + err = io.EOF + } + + return n, err +} + +func (hrs *httpReadSeeker) Seek(offset int64, whence int) (int64, error) { + if hrs.err != nil { + return 0, hrs.err + } + + var err error + newOffset := hrs.offset + + switch whence { + case os.SEEK_CUR: + newOffset += int64(offset) + case os.SEEK_END: + newOffset = hrs.size + int64(offset) + case os.SEEK_SET: + newOffset = int64(offset) + } + + if newOffset < 0 { + err = errors.New("cannot seek to negative position") + } else { + if hrs.offset != newOffset { + hrs.reset() + } + + // No problems, set the offset. + hrs.offset = newOffset + } + + return hrs.offset, err +} + +func (hrs *httpReadSeeker) Close() error { + if hrs.err != nil { + return hrs.err + } + + // close and release reader chain + if hrs.rc != nil { + hrs.rc.Close() + } + + hrs.rc = nil + hrs.brd = nil + + hrs.err = errors.New("httpLayer: closed") + + return nil +} + +func (hrs *httpReadSeeker) reset() { + if hrs.err != nil { + return + } + if hrs.rc != nil { + hrs.rc.Close() + hrs.rc = nil + } +} + +func (hrs *httpReadSeeker) reader() (io.Reader, error) { + if hrs.err != nil { + return nil, hrs.err + } + + if hrs.rc != nil { + return hrs.brd, nil + } + + // If the offset is great than or equal to size, return a empty, noop reader. + if hrs.offset >= hrs.size { + return ioutil.NopCloser(bytes.NewReader([]byte{})), nil + } + + req, err := http.NewRequest("GET", hrs.url, nil) + if err != nil { + return nil, err + } + + if hrs.offset > 0 { + // TODO(stevvooe): Get this working correctly. + + // If we are at different offset, issue a range request from there. + req.Header.Add("Range", "1-") + // TODO: get context in here + // context.GetLogger(hrs.context).Infof("Range: %s", req.Header.Get("Range")) + } + + resp, err := hrs.client.Do(req) + if err != nil { + return nil, err + } + + switch { + case resp.StatusCode == 200: + hrs.rc = resp.Body + default: + defer resp.Body.Close() + return nil, fmt.Errorf("unexpected status resolving reader: %v", resp.Status) + } + + if hrs.brd == nil { + hrs.brd = bufio.NewReader(hrs.rc) + } else { + hrs.brd.Reset(hrs.rc) + } + + return hrs.brd, nil +} diff --git a/registry/client/repository.go b/registry/client/repository.go index 940ae1df9..61dcf0f44 100644 --- a/registry/client/repository.go +++ b/registry/client/repository.go @@ -18,6 +18,7 @@ import ( "github.com/docker/distribution" "github.com/docker/distribution/context" "github.com/docker/distribution/registry/api/v2" + "github.com/docker/distribution/registry/storage/cache" ) // NewRepository creates a new Repository for the given repository name and endpoint @@ -56,9 +57,13 @@ func (r *repository) Name() string { return r.name } -func (r *repository) Blobs(ctx context.Context) distribution.BlobService { +func (r *repository) Blobs(ctx context.Context) distribution.BlobStore { + statter := &blobStatter{ + repository: r, + } return &blobs{ repository: r, + statter: cache.NewCachedBlobStatter(cache.NewInMemoryBlobDescriptorCacheProvider(), statter), } } @@ -232,6 +237,8 @@ func (ms *manifests) Delete(dgst digest.Digest) error { type blobs struct { *repository + + statter distribution.BlobStatter } func sanitizeLocation(location, source string) (string, error) { @@ -255,12 +262,17 @@ func sanitizeLocation(location, source string) (string, error) { return location, nil } +func (ls *blobs) Stat(ctx context.Context, dgst digest.Digest) (distribution.Descriptor, error) { + return ls.statter.Stat(ctx, dgst) + +} + func (ls *blobs) Get(ctx context.Context, dgst digest.Digest) ([]byte, error) { desc, err := ls.Stat(ctx, dgst) if err != nil { return nil, err } - reader, err := ls.Open(ctx, desc) + reader, err := ls.Open(ctx, desc.Digest) if err != nil { return nil, err } @@ -269,19 +281,26 @@ func (ls *blobs) Get(ctx context.Context, dgst digest.Digest) ([]byte, error) { return ioutil.ReadAll(reader) } -func (ls *blobs) Open(ctx context.Context, desc distribution.Descriptor) (distribution.ReadSeekCloser, error) { - return &httpBlob{ - repository: ls.repository, - desc: desc, - }, nil +func (ls *blobs) Open(ctx context.Context, dgst digest.Digest) (distribution.ReadSeekCloser, error) { + stat, err := ls.statter.Stat(ctx, dgst) + if err != nil { + return nil, err + } + + blobURL, err := ls.ub.BuildBlobURL(ls.Name(), stat.Digest) + if err != nil { + return nil, err + } + + return NewHTTPReadSeeker(ls.repository.client, blobURL, stat.Length), nil } -func (ls *blobs) ServeBlob(ctx context.Context, w http.ResponseWriter, r *http.Request, desc distribution.Descriptor) error { +func (ls *blobs) ServeBlob(ctx context.Context, w http.ResponseWriter, r *http.Request, dgst digest.Digest) error { return nil } func (ls *blobs) Put(ctx context.Context, mediaType string, p []byte) (distribution.Descriptor, error) { - writer, err := ls.Writer(ctx) + writer, err := ls.Create(ctx) if err != nil { return distribution.Descriptor{}, err } @@ -303,7 +322,7 @@ func (ls *blobs) Put(ctx context.Context, mediaType string, p []byte) (distribut return writer.Commit(ctx, desc) } -func (ls *blobs) Writer(ctx context.Context) (distribution.BlobWriter, error) { +func (ls *blobs) Create(ctx context.Context) (distribution.BlobWriter, error) { u, err := ls.ub.BuildBlobUploadURL(ls.name) resp, err := ls.client.Post(u, "", nil) @@ -337,7 +356,11 @@ func (ls *blobs) Resume(ctx context.Context, id string) (distribution.BlobWriter panic("not implemented") } -func (ls *blobs) Stat(ctx context.Context, dgst digest.Digest) (distribution.Descriptor, error) { +type blobStatter struct { + *repository +} + +func (ls *blobStatter) Stat(ctx context.Context, dgst digest.Digest) (distribution.Descriptor, error) { u, err := ls.ub.BuildBlobURL(ls.name, dgst) if err != nil { return distribution.Descriptor{}, err diff --git a/registry/client/repository_test.go b/registry/client/repository_test.go index 514f3ee2c..f0f403166 100644 --- a/registry/client/repository_test.go +++ b/registry/client/repository_test.go @@ -237,7 +237,7 @@ func TestBlobUploadChunked(t *testing.T) { } l := r.Blobs(ctx) - upload, err := l.Writer(ctx) + upload, err := l.Create(ctx) if err != nil { t.Fatal(err) } @@ -348,7 +348,7 @@ func TestBlobUploadMonolithic(t *testing.T) { } l := r.Blobs(ctx) - upload, err := l.Writer(ctx) + upload, err := l.Create(ctx) if err != nil { t.Fatal(err) }