forked from TrueCloudLab/distribution
Update to use blob interfaces
Signed-off-by: Derek McGowan <derek@mcgstyle.net> (github: dmcgowan)
This commit is contained in:
parent
fddeb1c8d5
commit
9d64e461be
5 changed files with 204 additions and 199 deletions
|
@ -8,18 +8,15 @@ import (
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"net/http"
|
"net/http"
|
||||||
"os"
|
"os"
|
||||||
"time"
|
|
||||||
|
|
||||||
|
"github.com/docker/distribution"
|
||||||
"github.com/docker/distribution/context"
|
"github.com/docker/distribution/context"
|
||||||
"github.com/docker/distribution/digest"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
type httpLayer struct {
|
type httpBlob struct {
|
||||||
*layers
|
*repository
|
||||||
|
|
||||||
size int64
|
desc distribution.Descriptor
|
||||||
digest digest.Digest
|
|
||||||
createdAt time.Time
|
|
||||||
|
|
||||||
rc io.ReadCloser // remote read closer
|
rc io.ReadCloser // remote read closer
|
||||||
brd *bufio.Reader // internal buffered io
|
brd *bufio.Reader // internal buffered io
|
||||||
|
@ -27,48 +24,40 @@ type httpLayer struct {
|
||||||
err error
|
err error
|
||||||
}
|
}
|
||||||
|
|
||||||
func (hl *httpLayer) CreatedAt() time.Time {
|
func (hb *httpBlob) Read(p []byte) (n int, err error) {
|
||||||
return hl.createdAt
|
if hb.err != nil {
|
||||||
}
|
return 0, hb.err
|
||||||
|
|
||||||
func (hl *httpLayer) Digest() digest.Digest {
|
|
||||||
return hl.digest
|
|
||||||
}
|
|
||||||
|
|
||||||
func (hl *httpLayer) Read(p []byte) (n int, err error) {
|
|
||||||
if hl.err != nil {
|
|
||||||
return 0, hl.err
|
|
||||||
}
|
}
|
||||||
|
|
||||||
rd, err := hl.reader()
|
rd, err := hb.reader()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, err
|
return 0, err
|
||||||
}
|
}
|
||||||
|
|
||||||
n, err = rd.Read(p)
|
n, err = rd.Read(p)
|
||||||
hl.offset += int64(n)
|
hb.offset += int64(n)
|
||||||
|
|
||||||
// Simulate io.EOF error if we reach filesize.
|
// Simulate io.EOF error if we reach filesize.
|
||||||
if err == nil && hl.offset >= hl.size {
|
if err == nil && hb.offset >= hb.desc.Length {
|
||||||
err = io.EOF
|
err = io.EOF
|
||||||
}
|
}
|
||||||
|
|
||||||
return n, err
|
return n, err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (hl *httpLayer) Seek(offset int64, whence int) (int64, error) {
|
func (hb *httpBlob) Seek(offset int64, whence int) (int64, error) {
|
||||||
if hl.err != nil {
|
if hb.err != nil {
|
||||||
return 0, hl.err
|
return 0, hb.err
|
||||||
}
|
}
|
||||||
|
|
||||||
var err error
|
var err error
|
||||||
newOffset := hl.offset
|
newOffset := hb.offset
|
||||||
|
|
||||||
switch whence {
|
switch whence {
|
||||||
case os.SEEK_CUR:
|
case os.SEEK_CUR:
|
||||||
newOffset += int64(offset)
|
newOffset += int64(offset)
|
||||||
case os.SEEK_END:
|
case os.SEEK_END:
|
||||||
newOffset = hl.size + int64(offset)
|
newOffset = hb.desc.Length + int64(offset)
|
||||||
case os.SEEK_SET:
|
case os.SEEK_SET:
|
||||||
newOffset = int64(offset)
|
newOffset = int64(offset)
|
||||||
}
|
}
|
||||||
|
@ -76,60 +65,60 @@ func (hl *httpLayer) Seek(offset int64, whence int) (int64, error) {
|
||||||
if newOffset < 0 {
|
if newOffset < 0 {
|
||||||
err = fmt.Errorf("cannot seek to negative position")
|
err = fmt.Errorf("cannot seek to negative position")
|
||||||
} else {
|
} else {
|
||||||
if hl.offset != newOffset {
|
if hb.offset != newOffset {
|
||||||
hl.reset()
|
hb.reset()
|
||||||
}
|
}
|
||||||
|
|
||||||
// No problems, set the offset.
|
// No problems, set the offset.
|
||||||
hl.offset = newOffset
|
hb.offset = newOffset
|
||||||
}
|
}
|
||||||
|
|
||||||
return hl.offset, err
|
return hb.offset, err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (hl *httpLayer) Close() error {
|
func (hb *httpBlob) Close() error {
|
||||||
if hl.err != nil {
|
if hb.err != nil {
|
||||||
return hl.err
|
return hb.err
|
||||||
}
|
}
|
||||||
|
|
||||||
// close and release reader chain
|
// close and release reader chain
|
||||||
if hl.rc != nil {
|
if hb.rc != nil {
|
||||||
hl.rc.Close()
|
hb.rc.Close()
|
||||||
}
|
}
|
||||||
|
|
||||||
hl.rc = nil
|
hb.rc = nil
|
||||||
hl.brd = nil
|
hb.brd = nil
|
||||||
|
|
||||||
hl.err = fmt.Errorf("httpLayer: closed")
|
hb.err = fmt.Errorf("httpBlob: closed")
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (hl *httpLayer) reset() {
|
func (hb *httpBlob) reset() {
|
||||||
if hl.err != nil {
|
if hb.err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
if hl.rc != nil {
|
if hb.rc != nil {
|
||||||
hl.rc.Close()
|
hb.rc.Close()
|
||||||
hl.rc = nil
|
hb.rc = nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (hl *httpLayer) reader() (io.Reader, error) {
|
func (hb *httpBlob) reader() (io.Reader, error) {
|
||||||
if hl.err != nil {
|
if hb.err != nil {
|
||||||
return nil, hl.err
|
return nil, hb.err
|
||||||
}
|
}
|
||||||
|
|
||||||
if hl.rc != nil {
|
if hb.rc != nil {
|
||||||
return hl.brd, nil
|
return hb.brd, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// If the offset is great than or equal to size, return a empty, noop reader.
|
// If the offset is great than or equal to size, return a empty, noop reader.
|
||||||
if hl.offset >= hl.size {
|
if hb.offset >= hb.desc.Length {
|
||||||
return ioutil.NopCloser(bytes.NewReader([]byte{})), nil
|
return ioutil.NopCloser(bytes.NewReader([]byte{})), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
blobURL, err := hl.ub.BuildBlobURL(hl.name, hl.digest)
|
blobURL, err := hb.ub.BuildBlobURL(hb.name, hb.desc.Digest)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -139,40 +128,32 @@ func (hl *httpLayer) reader() (io.Reader, error) {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
if hl.offset > 0 {
|
if hb.offset > 0 {
|
||||||
// TODO(stevvooe): Get this working correctly.
|
// TODO(stevvooe): Get this working correctly.
|
||||||
|
|
||||||
// If we are at different offset, issue a range request from there.
|
// If we are at different offset, issue a range request from there.
|
||||||
req.Header.Add("Range", fmt.Sprintf("1-"))
|
req.Header.Add("Range", fmt.Sprintf("1-"))
|
||||||
context.GetLogger(hl.context).Infof("Range: %s", req.Header.Get("Range"))
|
context.GetLogger(hb.context).Infof("Range: %s", req.Header.Get("Range"))
|
||||||
}
|
}
|
||||||
|
|
||||||
resp, err := hl.client.Do(req)
|
resp, err := hb.client.Do(req)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
switch {
|
switch {
|
||||||
case resp.StatusCode == 200:
|
case resp.StatusCode == 200:
|
||||||
hl.rc = resp.Body
|
hb.rc = resp.Body
|
||||||
default:
|
default:
|
||||||
defer resp.Body.Close()
|
defer resp.Body.Close()
|
||||||
return nil, fmt.Errorf("unexpected status resolving reader: %v", resp.Status)
|
return nil, fmt.Errorf("unexpected status resolving reader: %v", resp.Status)
|
||||||
}
|
}
|
||||||
|
|
||||||
if hl.brd == nil {
|
if hb.brd == nil {
|
||||||
hl.brd = bufio.NewReader(hl.rc)
|
hb.brd = bufio.NewReader(hb.rc)
|
||||||
} else {
|
} else {
|
||||||
hl.brd.Reset(hl.rc)
|
hb.brd.Reset(hb.rc)
|
||||||
}
|
}
|
||||||
|
|
||||||
return hl.brd, nil
|
return hb.brd, nil
|
||||||
}
|
|
||||||
|
|
||||||
func (hl *httpLayer) Length() int64 {
|
|
||||||
return hl.size
|
|
||||||
}
|
|
||||||
|
|
||||||
func (hl *httpLayer) Handler(r *http.Request) (http.Handler, error) {
|
|
||||||
panic("Not implemented")
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -11,10 +11,10 @@ import (
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/docker/distribution"
|
"github.com/docker/distribution"
|
||||||
"github.com/docker/distribution/digest"
|
"github.com/docker/distribution/context"
|
||||||
)
|
)
|
||||||
|
|
||||||
type httpLayerUpload struct {
|
type httpBlobUpload struct {
|
||||||
repo distribution.Repository
|
repo distribution.Repository
|
||||||
client *http.Client
|
client *http.Client
|
||||||
|
|
||||||
|
@ -26,32 +26,32 @@ type httpLayerUpload struct {
|
||||||
closed bool
|
closed bool
|
||||||
}
|
}
|
||||||
|
|
||||||
func (hlu *httpLayerUpload) handleErrorResponse(resp *http.Response) error {
|
func (hbu *httpBlobUpload) handleErrorResponse(resp *http.Response) error {
|
||||||
if resp.StatusCode == http.StatusNotFound {
|
if resp.StatusCode == http.StatusNotFound {
|
||||||
return &BlobUploadNotFoundError{Location: hlu.location}
|
return &BlobUploadNotFoundError{Location: hbu.location}
|
||||||
}
|
}
|
||||||
return handleErrorResponse(resp)
|
return handleErrorResponse(resp)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (hlu *httpLayerUpload) ReadFrom(r io.Reader) (n int64, err error) {
|
func (hbu *httpBlobUpload) ReadFrom(r io.Reader) (n int64, err error) {
|
||||||
req, err := http.NewRequest("PATCH", hlu.location, ioutil.NopCloser(r))
|
req, err := http.NewRequest("PATCH", hbu.location, ioutil.NopCloser(r))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, err
|
return 0, err
|
||||||
}
|
}
|
||||||
defer req.Body.Close()
|
defer req.Body.Close()
|
||||||
|
|
||||||
resp, err := hlu.client.Do(req)
|
resp, err := hbu.client.Do(req)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, err
|
return 0, err
|
||||||
}
|
}
|
||||||
|
|
||||||
if resp.StatusCode != http.StatusAccepted {
|
if resp.StatusCode != http.StatusAccepted {
|
||||||
return 0, hlu.handleErrorResponse(resp)
|
return 0, hbu.handleErrorResponse(resp)
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO(dmcgowan): Validate headers
|
// TODO(dmcgowan): Validate headers
|
||||||
hlu.uuid = resp.Header.Get("Docker-Upload-UUID")
|
hbu.uuid = resp.Header.Get("Docker-Upload-UUID")
|
||||||
hlu.location, err = sanitizeLocation(resp.Header.Get("Location"), hlu.location)
|
hbu.location, err = sanitizeLocation(resp.Header.Get("Location"), hbu.location)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, err
|
return 0, err
|
||||||
}
|
}
|
||||||
|
@ -67,27 +67,27 @@ func (hlu *httpLayerUpload) ReadFrom(r io.Reader) (n int64, err error) {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (hlu *httpLayerUpload) Write(p []byte) (n int, err error) {
|
func (hbu *httpBlobUpload) Write(p []byte) (n int, err error) {
|
||||||
req, err := http.NewRequest("PATCH", hlu.location, bytes.NewReader(p))
|
req, err := http.NewRequest("PATCH", hbu.location, bytes.NewReader(p))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, err
|
return 0, err
|
||||||
}
|
}
|
||||||
req.Header.Set("Content-Range", fmt.Sprintf("%d-%d", hlu.offset, hlu.offset+int64(len(p)-1)))
|
req.Header.Set("Content-Range", fmt.Sprintf("%d-%d", hbu.offset, hbu.offset+int64(len(p)-1)))
|
||||||
req.Header.Set("Content-Length", fmt.Sprintf("%d", len(p)))
|
req.Header.Set("Content-Length", fmt.Sprintf("%d", len(p)))
|
||||||
req.Header.Set("Content-Type", "application/octet-stream")
|
req.Header.Set("Content-Type", "application/octet-stream")
|
||||||
|
|
||||||
resp, err := hlu.client.Do(req)
|
resp, err := hbu.client.Do(req)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, err
|
return 0, err
|
||||||
}
|
}
|
||||||
|
|
||||||
if resp.StatusCode != http.StatusAccepted {
|
if resp.StatusCode != http.StatusAccepted {
|
||||||
return 0, hlu.handleErrorResponse(resp)
|
return 0, hbu.handleErrorResponse(resp)
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO(dmcgowan): Validate headers
|
// TODO(dmcgowan): Validate headers
|
||||||
hlu.uuid = resp.Header.Get("Docker-Upload-UUID")
|
hbu.uuid = resp.Header.Get("Docker-Upload-UUID")
|
||||||
hlu.location, err = sanitizeLocation(resp.Header.Get("Location"), hlu.location)
|
hbu.location, err = sanitizeLocation(resp.Header.Get("Location"), hbu.location)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, err
|
return 0, err
|
||||||
}
|
}
|
||||||
|
@ -103,8 +103,8 @@ func (hlu *httpLayerUpload) Write(p []byte) (n int, err error) {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (hlu *httpLayerUpload) Seek(offset int64, whence int) (int64, error) {
|
func (hbu *httpBlobUpload) Seek(offset int64, whence int) (int64, error) {
|
||||||
newOffset := hlu.offset
|
newOffset := hbu.offset
|
||||||
|
|
||||||
switch whence {
|
switch whence {
|
||||||
case os.SEEK_CUR:
|
case os.SEEK_CUR:
|
||||||
|
@ -115,47 +115,47 @@ func (hlu *httpLayerUpload) Seek(offset int64, whence int) (int64, error) {
|
||||||
newOffset = int64(offset)
|
newOffset = int64(offset)
|
||||||
}
|
}
|
||||||
|
|
||||||
hlu.offset = newOffset
|
hbu.offset = newOffset
|
||||||
|
|
||||||
return hlu.offset, nil
|
return hbu.offset, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (hlu *httpLayerUpload) UUID() string {
|
func (hbu *httpBlobUpload) ID() string {
|
||||||
return hlu.uuid
|
return hbu.uuid
|
||||||
}
|
}
|
||||||
|
|
||||||
func (hlu *httpLayerUpload) StartedAt() time.Time {
|
func (hbu *httpBlobUpload) StartedAt() time.Time {
|
||||||
return hlu.startedAt
|
return hbu.startedAt
|
||||||
}
|
}
|
||||||
|
|
||||||
func (hlu *httpLayerUpload) Finish(digest digest.Digest) (distribution.Layer, error) {
|
func (hbu *httpBlobUpload) Commit(ctx context.Context, desc distribution.Descriptor) (distribution.Descriptor, error) {
|
||||||
// TODO(dmcgowan): Check if already finished, if so just fetch
|
// TODO(dmcgowan): Check if already finished, if so just fetch
|
||||||
req, err := http.NewRequest("PUT", hlu.location, nil)
|
req, err := http.NewRequest("PUT", hbu.location, nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return distribution.Descriptor{}, err
|
||||||
}
|
}
|
||||||
|
|
||||||
values := req.URL.Query()
|
values := req.URL.Query()
|
||||||
values.Set("digest", digest.String())
|
values.Set("digest", desc.Digest.String())
|
||||||
req.URL.RawQuery = values.Encode()
|
req.URL.RawQuery = values.Encode()
|
||||||
|
|
||||||
resp, err := hlu.client.Do(req)
|
resp, err := hbu.client.Do(req)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return distribution.Descriptor{}, err
|
||||||
}
|
}
|
||||||
|
|
||||||
if resp.StatusCode != http.StatusCreated {
|
if resp.StatusCode != http.StatusCreated {
|
||||||
return nil, hlu.handleErrorResponse(resp)
|
return distribution.Descriptor{}, hbu.handleErrorResponse(resp)
|
||||||
}
|
}
|
||||||
|
|
||||||
return hlu.repo.Layers().Fetch(digest)
|
return hbu.repo.Blobs(ctx).Stat(ctx, desc.Digest)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (hlu *httpLayerUpload) Cancel() error {
|
func (hbu *httpBlobUpload) Rollback(ctx context.Context) error {
|
||||||
panic("not implemented")
|
panic("not implemented")
|
||||||
}
|
}
|
||||||
|
|
||||||
func (hlu *httpLayerUpload) Close() error {
|
func (hbu *httpBlobUpload) Close() error {
|
||||||
hlu.closed = true
|
hbu.closed = true
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -11,8 +11,8 @@ import (
|
||||||
"github.com/docker/distribution/testutil"
|
"github.com/docker/distribution/testutil"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Test implements distribution.LayerUpload
|
// Test implements distribution.BlobWriter
|
||||||
var _ distribution.LayerUpload = &httpLayerUpload{}
|
var _ distribution.BlobWriter = &httpBlobUpload{}
|
||||||
|
|
||||||
func TestUploadReadFrom(t *testing.T) {
|
func TestUploadReadFrom(t *testing.T) {
|
||||||
_, b := newRandomBlob(64)
|
_, b := newRandomBlob(64)
|
||||||
|
@ -124,13 +124,13 @@ func TestUploadReadFrom(t *testing.T) {
|
||||||
e, c := testServer(m)
|
e, c := testServer(m)
|
||||||
defer c()
|
defer c()
|
||||||
|
|
||||||
layerUpload := &httpLayerUpload{
|
blobUpload := &httpBlobUpload{
|
||||||
client: &http.Client{},
|
client: &http.Client{},
|
||||||
}
|
}
|
||||||
|
|
||||||
// Valid case
|
// Valid case
|
||||||
layerUpload.location = e + locationPath
|
blobUpload.location = e + locationPath
|
||||||
n, err := layerUpload.ReadFrom(bytes.NewReader(b))
|
n, err := blobUpload.ReadFrom(bytes.NewReader(b))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("Error calling ReadFrom: %s", err)
|
t.Fatalf("Error calling ReadFrom: %s", err)
|
||||||
}
|
}
|
||||||
|
@ -139,15 +139,15 @@ func TestUploadReadFrom(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Bad range
|
// Bad range
|
||||||
layerUpload.location = e + locationPath
|
blobUpload.location = e + locationPath
|
||||||
_, err = layerUpload.ReadFrom(bytes.NewReader(b))
|
_, err = blobUpload.ReadFrom(bytes.NewReader(b))
|
||||||
if err == nil {
|
if err == nil {
|
||||||
t.Fatalf("Expected error when bad range received")
|
t.Fatalf("Expected error when bad range received")
|
||||||
}
|
}
|
||||||
|
|
||||||
// 404
|
// 404
|
||||||
layerUpload.location = e + locationPath
|
blobUpload.location = e + locationPath
|
||||||
_, err = layerUpload.ReadFrom(bytes.NewReader(b))
|
_, err = blobUpload.ReadFrom(bytes.NewReader(b))
|
||||||
if err == nil {
|
if err == nil {
|
||||||
t.Fatalf("Expected error when not found")
|
t.Fatalf("Expected error when not found")
|
||||||
}
|
}
|
||||||
|
@ -158,8 +158,8 @@ func TestUploadReadFrom(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// 400 valid json
|
// 400 valid json
|
||||||
layerUpload.location = e + locationPath
|
blobUpload.location = e + locationPath
|
||||||
_, err = layerUpload.ReadFrom(bytes.NewReader(b))
|
_, err = blobUpload.ReadFrom(bytes.NewReader(b))
|
||||||
if err == nil {
|
if err == nil {
|
||||||
t.Fatalf("Expected error when not found")
|
t.Fatalf("Expected error when not found")
|
||||||
}
|
}
|
||||||
|
@ -181,8 +181,8 @@ func TestUploadReadFrom(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// 400 invalid json
|
// 400 invalid json
|
||||||
layerUpload.location = e + locationPath
|
blobUpload.location = e + locationPath
|
||||||
_, err = layerUpload.ReadFrom(bytes.NewReader(b))
|
_, err = blobUpload.ReadFrom(bytes.NewReader(b))
|
||||||
if err == nil {
|
if err == nil {
|
||||||
t.Fatalf("Expected error when not found")
|
t.Fatalf("Expected error when not found")
|
||||||
}
|
}
|
||||||
|
@ -196,8 +196,8 @@ func TestUploadReadFrom(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// 500
|
// 500
|
||||||
layerUpload.location = e + locationPath
|
blobUpload.location = e + locationPath
|
||||||
_, err = layerUpload.ReadFrom(bytes.NewReader(b))
|
_, err = blobUpload.ReadFrom(bytes.NewReader(b))
|
||||||
if err == nil {
|
if err == nil {
|
||||||
t.Fatalf("Expected error when not found")
|
t.Fatalf("Expected error when not found")
|
||||||
}
|
}
|
||||||
|
|
|
@ -4,6 +4,7 @@ import (
|
||||||
"bytes"
|
"bytes"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"io"
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"net/http"
|
"net/http"
|
||||||
"net/url"
|
"net/url"
|
||||||
|
@ -55,8 +56,8 @@ func (r *repository) Name() string {
|
||||||
return r.name
|
return r.name
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *repository) Layers() distribution.LayerService {
|
func (r *repository) Blobs(ctx context.Context) distribution.BlobService {
|
||||||
return &layers{
|
return &blobs{
|
||||||
repository: r,
|
repository: r,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -229,7 +230,7 @@ func (ms *manifests) Delete(dgst digest.Digest) error {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
type layers struct {
|
type blobs struct {
|
||||||
*repository
|
*repository
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -254,25 +255,55 @@ func sanitizeLocation(location, source string) (string, error) {
|
||||||
return location, nil
|
return location, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ls *layers) Exists(dgst digest.Digest) (bool, error) {
|
func (ls *blobs) Get(ctx context.Context, dgst digest.Digest) ([]byte, error) {
|
||||||
_, err := ls.fetchLayer(dgst)
|
desc, err := ls.Stat(ctx, dgst)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
switch err := err.(type) {
|
return nil, err
|
||||||
case distribution.ErrUnknownLayer:
|
}
|
||||||
return false, nil
|
reader, err := ls.Open(ctx, desc)
|
||||||
default:
|
if err != nil {
|
||||||
return false, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
defer reader.Close()
|
||||||
|
|
||||||
|
return ioutil.ReadAll(reader)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ls *blobs) Open(ctx context.Context, desc distribution.Descriptor) (distribution.ReadSeekCloser, error) {
|
||||||
|
return &httpBlob{
|
||||||
|
repository: ls.repository,
|
||||||
|
desc: desc,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ls *blobs) ServeBlob(ctx context.Context, w http.ResponseWriter, r *http.Request, desc distribution.Descriptor) error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ls *blobs) Put(ctx context.Context, mediaType string, p []byte) (distribution.Descriptor, error) {
|
||||||
|
writer, err := ls.Writer(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return distribution.Descriptor{}, err
|
||||||
|
}
|
||||||
|
dgstr := digest.NewCanonicalDigester()
|
||||||
|
n, err := io.Copy(writer, io.TeeReader(bytes.NewReader(p), dgstr))
|
||||||
|
if err != nil {
|
||||||
|
return distribution.Descriptor{}, err
|
||||||
|
}
|
||||||
|
if n < int64(len(p)) {
|
||||||
|
return distribution.Descriptor{}, fmt.Errorf("short copy: wrote %d of %d", n, len(p))
|
||||||
}
|
}
|
||||||
|
|
||||||
return true, nil
|
desc := distribution.Descriptor{
|
||||||
|
MediaType: mediaType,
|
||||||
|
Length: int64(len(p)),
|
||||||
|
Digest: dgstr.Digest(),
|
||||||
|
}
|
||||||
|
|
||||||
|
return writer.Commit(ctx, desc)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ls *layers) Fetch(dgst digest.Digest) (distribution.Layer, error) {
|
func (ls *blobs) Writer(ctx context.Context) (distribution.BlobWriter, error) {
|
||||||
return ls.fetchLayer(dgst)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (ls *layers) Upload() (distribution.LayerUpload, error) {
|
|
||||||
u, err := ls.ub.BuildBlobUploadURL(ls.name)
|
u, err := ls.ub.BuildBlobUploadURL(ls.name)
|
||||||
|
|
||||||
resp, err := ls.client.Post(u, "", nil)
|
resp, err := ls.client.Post(u, "", nil)
|
||||||
|
@ -290,7 +321,7 @@ func (ls *layers) Upload() (distribution.LayerUpload, error) {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
return &httpLayerUpload{
|
return &httpBlobUpload{
|
||||||
repo: ls.repository,
|
repo: ls.repository,
|
||||||
client: ls.client,
|
client: ls.client,
|
||||||
uuid: uuid,
|
uuid: uuid,
|
||||||
|
@ -302,19 +333,19 @@ func (ls *layers) Upload() (distribution.LayerUpload, error) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ls *layers) Resume(uuid string) (distribution.LayerUpload, error) {
|
func (ls *blobs) Resume(ctx context.Context, id string) (distribution.BlobWriter, error) {
|
||||||
panic("not implemented")
|
panic("not implemented")
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ls *layers) fetchLayer(dgst digest.Digest) (distribution.Layer, error) {
|
func (ls *blobs) Stat(ctx context.Context, dgst digest.Digest) (distribution.Descriptor, error) {
|
||||||
u, err := ls.ub.BuildBlobURL(ls.name, dgst)
|
u, err := ls.ub.BuildBlobURL(ls.name, dgst)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return distribution.Descriptor{}, err
|
||||||
}
|
}
|
||||||
|
|
||||||
resp, err := ls.client.Head(u)
|
resp, err := ls.client.Head(u)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return distribution.Descriptor{}, err
|
||||||
}
|
}
|
||||||
defer resp.Body.Close()
|
defer resp.Body.Close()
|
||||||
|
|
||||||
|
@ -323,31 +354,17 @@ func (ls *layers) fetchLayer(dgst digest.Digest) (distribution.Layer, error) {
|
||||||
lengthHeader := resp.Header.Get("Content-Length")
|
lengthHeader := resp.Header.Get("Content-Length")
|
||||||
length, err := strconv.ParseInt(lengthHeader, 10, 64)
|
length, err := strconv.ParseInt(lengthHeader, 10, 64)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("error parsing content-length: %v", err)
|
return distribution.Descriptor{}, fmt.Errorf("error parsing content-length: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
var t time.Time
|
return distribution.Descriptor{
|
||||||
lastModified := resp.Header.Get("Last-Modified")
|
MediaType: resp.Header.Get("Content-Type"),
|
||||||
if lastModified != "" {
|
Length: length,
|
||||||
t, err = http.ParseTime(lastModified)
|
Digest: dgst,
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("error parsing last-modified: %v", err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return &httpLayer{
|
|
||||||
layers: ls,
|
|
||||||
size: length,
|
|
||||||
digest: dgst,
|
|
||||||
createdAt: t,
|
|
||||||
}, nil
|
}, nil
|
||||||
case resp.StatusCode == http.StatusNotFound:
|
case resp.StatusCode == http.StatusNotFound:
|
||||||
return nil, distribution.ErrUnknownLayer{
|
return distribution.Descriptor{}, distribution.ErrBlobUnknown
|
||||||
FSLayer: manifest.FSLayer{
|
|
||||||
BlobSum: dgst,
|
|
||||||
},
|
|
||||||
}
|
|
||||||
default:
|
default:
|
||||||
return nil, handleErrorResponse(resp)
|
return distribution.Descriptor{}, handleErrorResponse(resp)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -5,7 +5,6 @@ import (
|
||||||
"crypto/rand"
|
"crypto/rand"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io/ioutil"
|
|
||||||
"log"
|
"log"
|
||||||
"net/http"
|
"net/http"
|
||||||
"net/http/httptest"
|
"net/http/httptest"
|
||||||
|
@ -15,6 +14,7 @@ import (
|
||||||
|
|
||||||
"code.google.com/p/go-uuid/uuid"
|
"code.google.com/p/go-uuid/uuid"
|
||||||
|
|
||||||
|
"github.com/docker/distribution"
|
||||||
"github.com/docker/distribution/context"
|
"github.com/docker/distribution/context"
|
||||||
"github.com/docker/distribution/digest"
|
"github.com/docker/distribution/digest"
|
||||||
"github.com/docker/distribution/manifest"
|
"github.com/docker/distribution/manifest"
|
||||||
|
@ -88,7 +88,7 @@ func addPing(m *testutil.RequestResponseMap) {
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestLayerFetch(t *testing.T) {
|
func TestBlobFetch(t *testing.T) {
|
||||||
d1, b1 := newRandomBlob(1024)
|
d1, b1 := newRandomBlob(1024)
|
||||||
var m testutil.RequestResponseMap
|
var m testutil.RequestResponseMap
|
||||||
addTestFetch("test.example.com/repo1", d1, b1, &m)
|
addTestFetch("test.example.com/repo1", d1, b1, &m)
|
||||||
|
@ -97,17 +97,14 @@ func TestLayerFetch(t *testing.T) {
|
||||||
e, c := testServer(m)
|
e, c := testServer(m)
|
||||||
defer c()
|
defer c()
|
||||||
|
|
||||||
r, err := NewRepository(context.Background(), "test.example.com/repo1", e, nil)
|
ctx := context.Background()
|
||||||
|
r, err := NewRepository(ctx, "test.example.com/repo1", e, nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
l := r.Layers()
|
l := r.Blobs(ctx)
|
||||||
|
|
||||||
layer, err := l.Fetch(d1)
|
b, err := l.Get(ctx, d1)
|
||||||
if err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
b, err := ioutil.ReadAll(layer)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
@ -118,7 +115,7 @@ func TestLayerFetch(t *testing.T) {
|
||||||
// TODO(dmcgowan): Test error cases
|
// TODO(dmcgowan): Test error cases
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestLayerExists(t *testing.T) {
|
func TestBlobExists(t *testing.T) {
|
||||||
d1, b1 := newRandomBlob(1024)
|
d1, b1 := newRandomBlob(1024)
|
||||||
var m testutil.RequestResponseMap
|
var m testutil.RequestResponseMap
|
||||||
addTestFetch("test.example.com/repo1", d1, b1, &m)
|
addTestFetch("test.example.com/repo1", d1, b1, &m)
|
||||||
|
@ -127,24 +124,30 @@ func TestLayerExists(t *testing.T) {
|
||||||
e, c := testServer(m)
|
e, c := testServer(m)
|
||||||
defer c()
|
defer c()
|
||||||
|
|
||||||
r, err := NewRepository(context.Background(), "test.example.com/repo1", e, nil)
|
ctx := context.Background()
|
||||||
|
r, err := NewRepository(ctx, "test.example.com/repo1", e, nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
l := r.Layers()
|
l := r.Blobs(ctx)
|
||||||
|
|
||||||
ok, err := l.Exists(d1)
|
stat, err := l.Stat(ctx, d1)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
if !ok {
|
|
||||||
t.Fatalf("Blob does not exist: %s", d1)
|
if stat.Digest != d1 {
|
||||||
|
t.Fatalf("Unexpected digest: %s, expected %s", stat.Digest, d1)
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO(dmcgowan): Test error cases
|
if stat.Length != int64(len(b1)) {
|
||||||
|
t.Fatalf("Unexpected length: %d, expected %d", stat.Length, len(b1))
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO(dmcgowan): Test error cases and ErrBlobUnknown case
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestLayerUploadChunked(t *testing.T) {
|
func TestBlobUploadChunked(t *testing.T) {
|
||||||
dgst, b1 := newRandomBlob(1024)
|
dgst, b1 := newRandomBlob(1024)
|
||||||
var m testutil.RequestResponseMap
|
var m testutil.RequestResponseMap
|
||||||
addPing(&m)
|
addPing(&m)
|
||||||
|
@ -227,19 +230,20 @@ func TestLayerUploadChunked(t *testing.T) {
|
||||||
e, c := testServer(m)
|
e, c := testServer(m)
|
||||||
defer c()
|
defer c()
|
||||||
|
|
||||||
r, err := NewRepository(context.Background(), repo, e, nil)
|
ctx := context.Background()
|
||||||
|
r, err := NewRepository(ctx, repo, e, nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
l := r.Layers()
|
l := r.Blobs(ctx)
|
||||||
|
|
||||||
upload, err := l.Upload()
|
upload, err := l.Writer(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if upload.UUID() != uuids[0] {
|
if upload.ID() != uuids[0] {
|
||||||
log.Fatalf("Unexpected UUID %s; expected %s", upload.UUID(), uuids[0])
|
log.Fatalf("Unexpected UUID %s; expected %s", upload.ID(), uuids[0])
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, chunk := range chunks {
|
for _, chunk := range chunks {
|
||||||
|
@ -252,17 +256,20 @@ func TestLayerUploadChunked(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
layer, err := upload.Finish(dgst)
|
blob, err := upload.Commit(ctx, distribution.Descriptor{
|
||||||
|
Digest: dgst,
|
||||||
|
Length: int64(len(b1)),
|
||||||
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if layer.Length() != int64(len(b1)) {
|
if blob.Length != int64(len(b1)) {
|
||||||
t.Fatalf("Unexpected layer size: %d; expected: %d", layer.Length(), len(b1))
|
t.Fatalf("Unexpected blob size: %d; expected: %d", blob.Length, len(b1))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestLayerUploadMonolithic(t *testing.T) {
|
func TestBlobUploadMonolithic(t *testing.T) {
|
||||||
dgst, b1 := newRandomBlob(1024)
|
dgst, b1 := newRandomBlob(1024)
|
||||||
var m testutil.RequestResponseMap
|
var m testutil.RequestResponseMap
|
||||||
addPing(&m)
|
addPing(&m)
|
||||||
|
@ -334,19 +341,20 @@ func TestLayerUploadMonolithic(t *testing.T) {
|
||||||
e, c := testServer(m)
|
e, c := testServer(m)
|
||||||
defer c()
|
defer c()
|
||||||
|
|
||||||
r, err := NewRepository(context.Background(), repo, e, nil)
|
ctx := context.Background()
|
||||||
|
r, err := NewRepository(ctx, repo, e, nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
l := r.Layers()
|
l := r.Blobs(ctx)
|
||||||
|
|
||||||
upload, err := l.Upload()
|
upload, err := l.Writer(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if upload.UUID() != uploadID {
|
if upload.ID() != uploadID {
|
||||||
log.Fatalf("Unexpected UUID %s; expected %s", upload.UUID(), uploadID)
|
log.Fatalf("Unexpected UUID %s; expected %s", upload.ID(), uploadID)
|
||||||
}
|
}
|
||||||
|
|
||||||
n, err := upload.ReadFrom(bytes.NewReader(b1))
|
n, err := upload.ReadFrom(bytes.NewReader(b1))
|
||||||
|
@ -357,20 +365,19 @@ func TestLayerUploadMonolithic(t *testing.T) {
|
||||||
t.Fatalf("Unexpected ReadFrom length: %d; expected: %d", n, len(b1))
|
t.Fatalf("Unexpected ReadFrom length: %d; expected: %d", n, len(b1))
|
||||||
}
|
}
|
||||||
|
|
||||||
layer, err := upload.Finish(dgst)
|
blob, err := upload.Commit(ctx, distribution.Descriptor{
|
||||||
|
Digest: dgst,
|
||||||
|
Length: int64(len(b1)),
|
||||||
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if layer.Length() != int64(len(b1)) {
|
if blob.Length != int64(len(b1)) {
|
||||||
t.Fatalf("Unexpected layer size: %d; expected: %d", layer.Length(), len(b1))
|
t.Fatalf("Unexpected blob size: %d; expected: %d", blob.Length, len(b1))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestLayerUploadResume(t *testing.T) {
|
|
||||||
// TODO(dmcgowan): implement
|
|
||||||
}
|
|
||||||
|
|
||||||
func newRandomSchema1Manifest(name, tag string, blobCount int) (*manifest.SignedManifest, digest.Digest) {
|
func newRandomSchema1Manifest(name, tag string, blobCount int) (*manifest.SignedManifest, digest.Digest) {
|
||||||
blobs := make([]manifest.FSLayer, blobCount)
|
blobs := make([]manifest.FSLayer, blobCount)
|
||||||
history := make([]manifest.History, blobCount)
|
history := make([]manifest.History, blobCount)
|
||||||
|
@ -447,7 +454,7 @@ func checkEqualManifest(m1, m2 *manifest.SignedManifest) error {
|
||||||
return fmt.Errorf("tag does not match %q != %q", m1.Tag, m2.Tag)
|
return fmt.Errorf("tag does not match %q != %q", m1.Tag, m2.Tag)
|
||||||
}
|
}
|
||||||
if len(m1.FSLayers) != len(m2.FSLayers) {
|
if len(m1.FSLayers) != len(m2.FSLayers) {
|
||||||
return fmt.Errorf("fs layer length does not match %d != %d", len(m1.FSLayers), len(m2.FSLayers))
|
return fmt.Errorf("fs blob length does not match %d != %d", len(m1.FSLayers), len(m2.FSLayers))
|
||||||
}
|
}
|
||||||
for i := range m1.FSLayers {
|
for i := range m1.FSLayers {
|
||||||
if m1.FSLayers[i].BlobSum != m2.FSLayers[i].BlobSum {
|
if m1.FSLayers[i].BlobSum != m2.FSLayers[i].BlobSum {
|
||||||
|
|
Loading…
Reference in a new issue