forked from TrueCloudLab/distribution
Remove deprecated client interface
Signed-off-by: Derek McGowan <derek@mcgstyle.net> (github: dmcgowan)
This commit is contained in:
parent
ce614b6de8
commit
174a732c94
5 changed files with 0 additions and 1540 deletions
|
@ -1,573 +0,0 @@
|
||||||
package client
|
|
||||||
|
|
||||||
import (
|
|
||||||
"bytes"
|
|
||||||
"encoding/json"
|
|
||||||
"fmt"
|
|
||||||
"io"
|
|
||||||
"net/http"
|
|
||||||
"regexp"
|
|
||||||
"strconv"
|
|
||||||
|
|
||||||
"github.com/docker/distribution/digest"
|
|
||||||
"github.com/docker/distribution/manifest"
|
|
||||||
"github.com/docker/distribution/registry/api/v2"
|
|
||||||
)
|
|
||||||
|
|
||||||
// Client implements the client interface to the registry http api
|
|
||||||
type Client interface {
|
|
||||||
// GetImageManifest returns an image manifest for the image at the given
|
|
||||||
// name, tag pair.
|
|
||||||
GetImageManifest(name, tag string) (*manifest.SignedManifest, error)
|
|
||||||
|
|
||||||
// PutImageManifest uploads an image manifest for the image at the given
|
|
||||||
// name, tag pair.
|
|
||||||
PutImageManifest(name, tag string, imageManifest *manifest.SignedManifest) error
|
|
||||||
|
|
||||||
// DeleteImage removes the image at the given name, tag pair.
|
|
||||||
DeleteImage(name, tag string) error
|
|
||||||
|
|
||||||
// ListImageTags returns a list of all image tags with the given repository
|
|
||||||
// name.
|
|
||||||
ListImageTags(name string) ([]string, error)
|
|
||||||
|
|
||||||
// BlobLength returns the length of the blob stored at the given name,
|
|
||||||
// digest pair.
|
|
||||||
// Returns a length value of -1 on error or if the blob does not exist.
|
|
||||||
BlobLength(name string, dgst digest.Digest) (int, error)
|
|
||||||
|
|
||||||
// GetBlob returns the blob stored at the given name, digest pair in the
|
|
||||||
// form of an io.ReadCloser with the length of this blob.
|
|
||||||
// A nonzero byteOffset can be provided to receive a partial blob beginning
|
|
||||||
// at the given offset.
|
|
||||||
GetBlob(name string, dgst digest.Digest, byteOffset int) (io.ReadCloser, int, error)
|
|
||||||
|
|
||||||
// InitiateBlobUpload starts a blob upload in the given repository namespace
|
|
||||||
// and returns a unique location url to use for other blob upload methods.
|
|
||||||
InitiateBlobUpload(name string) (string, error)
|
|
||||||
|
|
||||||
// GetBlobUploadStatus returns the byte offset and length of the blob at the
|
|
||||||
// given upload location.
|
|
||||||
GetBlobUploadStatus(location string) (int, int, error)
|
|
||||||
|
|
||||||
// UploadBlob uploads a full blob to the registry.
|
|
||||||
UploadBlob(location string, blob io.ReadCloser, length int, dgst digest.Digest) error
|
|
||||||
|
|
||||||
// UploadBlobChunk uploads a blob chunk with a given length and startByte to
|
|
||||||
// the registry.
|
|
||||||
// FinishChunkedBlobUpload must be called to finalize this upload.
|
|
||||||
UploadBlobChunk(location string, blobChunk io.ReadCloser, length, startByte int) error
|
|
||||||
|
|
||||||
// FinishChunkedBlobUpload completes a chunked blob upload at a given
|
|
||||||
// location.
|
|
||||||
FinishChunkedBlobUpload(location string, length int, dgst digest.Digest) error
|
|
||||||
|
|
||||||
// CancelBlobUpload deletes all content at the unfinished blob upload
|
|
||||||
// location and invalidates any future calls to this blob upload.
|
|
||||||
CancelBlobUpload(location string) error
|
|
||||||
}
|
|
||||||
|
|
||||||
var (
|
|
||||||
patternRangeHeader = regexp.MustCompile("bytes=0-(\\d+)/(\\d+)")
|
|
||||||
)
|
|
||||||
|
|
||||||
// New returns a new Client which operates against a registry with the
|
|
||||||
// given base endpoint
|
|
||||||
// This endpoint should not include /v2/ or any part of the url after this.
|
|
||||||
func New(endpoint string) (Client, error) {
|
|
||||||
ub, err := v2.NewURLBuilderFromString(endpoint)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
return &clientImpl{
|
|
||||||
endpoint: endpoint,
|
|
||||||
ub: ub,
|
|
||||||
}, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// clientImpl is the default implementation of the Client interface
|
|
||||||
type clientImpl struct {
|
|
||||||
endpoint string
|
|
||||||
ub *v2.URLBuilder
|
|
||||||
}
|
|
||||||
|
|
||||||
// TODO(bbland): use consistent route generation between server and client
|
|
||||||
|
|
||||||
func (r *clientImpl) GetImageManifest(name, tag string) (*manifest.SignedManifest, error) {
|
|
||||||
manifestURL, err := r.ub.BuildManifestURL(name, tag)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
response, err := http.Get(manifestURL)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
defer response.Body.Close()
|
|
||||||
|
|
||||||
// TODO(bbland): handle other status codes, like 5xx errors
|
|
||||||
switch {
|
|
||||||
case response.StatusCode == http.StatusOK:
|
|
||||||
break
|
|
||||||
case response.StatusCode == http.StatusNotFound:
|
|
||||||
return nil, &ImageManifestNotFoundError{Name: name, Tag: tag}
|
|
||||||
case response.StatusCode >= 400 && response.StatusCode < 500:
|
|
||||||
var errs v2.Errors
|
|
||||||
|
|
||||||
decoder := json.NewDecoder(response.Body)
|
|
||||||
err = decoder.Decode(&errs)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
return nil, &errs
|
|
||||||
default:
|
|
||||||
return nil, &UnexpectedHTTPStatusError{Status: response.Status}
|
|
||||||
}
|
|
||||||
|
|
||||||
decoder := json.NewDecoder(response.Body)
|
|
||||||
|
|
||||||
manifest := new(manifest.SignedManifest)
|
|
||||||
err = decoder.Decode(manifest)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
return manifest, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (r *clientImpl) PutImageManifest(name, tag string, manifest *manifest.SignedManifest) error {
|
|
||||||
manifestURL, err := r.ub.BuildManifestURL(name, tag)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
putRequest, err := http.NewRequest("PUT", manifestURL, bytes.NewReader(manifest.Raw))
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
response, err := http.DefaultClient.Do(putRequest)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
defer response.Body.Close()
|
|
||||||
|
|
||||||
// TODO(bbland): handle other status codes, like 5xx errors
|
|
||||||
switch {
|
|
||||||
case response.StatusCode == http.StatusOK || response.StatusCode == http.StatusAccepted:
|
|
||||||
return nil
|
|
||||||
case response.StatusCode >= 400 && response.StatusCode < 500:
|
|
||||||
var errors v2.Errors
|
|
||||||
decoder := json.NewDecoder(response.Body)
|
|
||||||
err = decoder.Decode(&errors)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
return &errors
|
|
||||||
default:
|
|
||||||
return &UnexpectedHTTPStatusError{Status: response.Status}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (r *clientImpl) DeleteImage(name, tag string) error {
|
|
||||||
manifestURL, err := r.ub.BuildManifestURL(name, tag)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
deleteRequest, err := http.NewRequest("DELETE", manifestURL, nil)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
response, err := http.DefaultClient.Do(deleteRequest)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
defer response.Body.Close()
|
|
||||||
|
|
||||||
// TODO(bbland): handle other status codes, like 5xx errors
|
|
||||||
switch {
|
|
||||||
case response.StatusCode == http.StatusNoContent:
|
|
||||||
break
|
|
||||||
case response.StatusCode == http.StatusNotFound:
|
|
||||||
return &ImageManifestNotFoundError{Name: name, Tag: tag}
|
|
||||||
case response.StatusCode >= 400 && response.StatusCode < 500:
|
|
||||||
var errs v2.Errors
|
|
||||||
decoder := json.NewDecoder(response.Body)
|
|
||||||
err = decoder.Decode(&errs)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
return &errs
|
|
||||||
default:
|
|
||||||
return &UnexpectedHTTPStatusError{Status: response.Status}
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (r *clientImpl) ListImageTags(name string) ([]string, error) {
|
|
||||||
tagsURL, err := r.ub.BuildTagsURL(name)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
response, err := http.Get(tagsURL)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
defer response.Body.Close()
|
|
||||||
|
|
||||||
// TODO(bbland): handle other status codes, like 5xx errors
|
|
||||||
switch {
|
|
||||||
case response.StatusCode == http.StatusOK:
|
|
||||||
break
|
|
||||||
case response.StatusCode == http.StatusNotFound:
|
|
||||||
return nil, &RepositoryNotFoundError{Name: name}
|
|
||||||
case response.StatusCode >= 400 && response.StatusCode < 500:
|
|
||||||
var errs v2.Errors
|
|
||||||
decoder := json.NewDecoder(response.Body)
|
|
||||||
err = decoder.Decode(&errs)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
return nil, &errs
|
|
||||||
default:
|
|
||||||
return nil, &UnexpectedHTTPStatusError{Status: response.Status}
|
|
||||||
}
|
|
||||||
|
|
||||||
tags := struct {
|
|
||||||
Tags []string `json:"tags"`
|
|
||||||
}{}
|
|
||||||
|
|
||||||
decoder := json.NewDecoder(response.Body)
|
|
||||||
err = decoder.Decode(&tags)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
return tags.Tags, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (r *clientImpl) BlobLength(name string, dgst digest.Digest) (int, error) {
|
|
||||||
blobURL, err := r.ub.BuildBlobURL(name, dgst)
|
|
||||||
if err != nil {
|
|
||||||
return -1, err
|
|
||||||
}
|
|
||||||
|
|
||||||
response, err := http.Head(blobURL)
|
|
||||||
if err != nil {
|
|
||||||
return -1, err
|
|
||||||
}
|
|
||||||
defer response.Body.Close()
|
|
||||||
|
|
||||||
// TODO(bbland): handle other status codes, like 5xx errors
|
|
||||||
switch {
|
|
||||||
case response.StatusCode == http.StatusOK:
|
|
||||||
lengthHeader := response.Header.Get("Content-Length")
|
|
||||||
length, err := strconv.ParseInt(lengthHeader, 10, 64)
|
|
||||||
if err != nil {
|
|
||||||
return -1, err
|
|
||||||
}
|
|
||||||
return int(length), nil
|
|
||||||
case response.StatusCode == http.StatusNotFound:
|
|
||||||
return -1, nil
|
|
||||||
case response.StatusCode >= 400 && response.StatusCode < 500:
|
|
||||||
var errs v2.Errors
|
|
||||||
decoder := json.NewDecoder(response.Body)
|
|
||||||
err = decoder.Decode(&errs)
|
|
||||||
if err != nil {
|
|
||||||
return -1, err
|
|
||||||
}
|
|
||||||
return -1, &errs
|
|
||||||
default:
|
|
||||||
return -1, &UnexpectedHTTPStatusError{Status: response.Status}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (r *clientImpl) GetBlob(name string, dgst digest.Digest, byteOffset int) (io.ReadCloser, int, error) {
|
|
||||||
blobURL, err := r.ub.BuildBlobURL(name, dgst)
|
|
||||||
if err != nil {
|
|
||||||
return nil, 0, err
|
|
||||||
}
|
|
||||||
|
|
||||||
getRequest, err := http.NewRequest("GET", blobURL, nil)
|
|
||||||
if err != nil {
|
|
||||||
return nil, 0, err
|
|
||||||
}
|
|
||||||
|
|
||||||
getRequest.Header.Add("Range", fmt.Sprintf("%d-", byteOffset))
|
|
||||||
response, err := http.DefaultClient.Do(getRequest)
|
|
||||||
if err != nil {
|
|
||||||
return nil, 0, err
|
|
||||||
}
|
|
||||||
|
|
||||||
// TODO(bbland): handle other status codes, like 5xx errors
|
|
||||||
switch {
|
|
||||||
case response.StatusCode == http.StatusOK:
|
|
||||||
lengthHeader := response.Header.Get("Content-Length")
|
|
||||||
length, err := strconv.ParseInt(lengthHeader, 10, 0)
|
|
||||||
if err != nil {
|
|
||||||
return nil, 0, err
|
|
||||||
}
|
|
||||||
return response.Body, int(length), nil
|
|
||||||
case response.StatusCode == http.StatusNotFound:
|
|
||||||
response.Body.Close()
|
|
||||||
return nil, 0, &BlobNotFoundError{Name: name, Digest: dgst}
|
|
||||||
case response.StatusCode >= 400 && response.StatusCode < 500:
|
|
||||||
var errs v2.Errors
|
|
||||||
decoder := json.NewDecoder(response.Body)
|
|
||||||
err = decoder.Decode(&errs)
|
|
||||||
if err != nil {
|
|
||||||
return nil, 0, err
|
|
||||||
}
|
|
||||||
return nil, 0, &errs
|
|
||||||
default:
|
|
||||||
response.Body.Close()
|
|
||||||
return nil, 0, &UnexpectedHTTPStatusError{Status: response.Status}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (r *clientImpl) InitiateBlobUpload(name string) (string, error) {
|
|
||||||
uploadURL, err := r.ub.BuildBlobUploadURL(name)
|
|
||||||
if err != nil {
|
|
||||||
return "", err
|
|
||||||
}
|
|
||||||
|
|
||||||
postRequest, err := http.NewRequest("POST", uploadURL, nil)
|
|
||||||
if err != nil {
|
|
||||||
return "", err
|
|
||||||
}
|
|
||||||
|
|
||||||
response, err := http.DefaultClient.Do(postRequest)
|
|
||||||
if err != nil {
|
|
||||||
return "", err
|
|
||||||
}
|
|
||||||
defer response.Body.Close()
|
|
||||||
|
|
||||||
// TODO(bbland): handle other status codes, like 5xx errors
|
|
||||||
switch {
|
|
||||||
case response.StatusCode == http.StatusAccepted:
|
|
||||||
return response.Header.Get("Location"), nil
|
|
||||||
// case response.StatusCode == http.StatusNotFound:
|
|
||||||
// return
|
|
||||||
case response.StatusCode >= 400 && response.StatusCode < 500:
|
|
||||||
var errs v2.Errors
|
|
||||||
decoder := json.NewDecoder(response.Body)
|
|
||||||
err = decoder.Decode(&errs)
|
|
||||||
if err != nil {
|
|
||||||
return "", err
|
|
||||||
}
|
|
||||||
return "", &errs
|
|
||||||
default:
|
|
||||||
return "", &UnexpectedHTTPStatusError{Status: response.Status}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (r *clientImpl) GetBlobUploadStatus(location string) (int, int, error) {
|
|
||||||
response, err := http.Get(location)
|
|
||||||
if err != nil {
|
|
||||||
return 0, 0, err
|
|
||||||
}
|
|
||||||
defer response.Body.Close()
|
|
||||||
|
|
||||||
// TODO(bbland): handle other status codes, like 5xx errors
|
|
||||||
switch {
|
|
||||||
case response.StatusCode == http.StatusNoContent:
|
|
||||||
return parseRangeHeader(response.Header.Get("Range"))
|
|
||||||
case response.StatusCode == http.StatusNotFound:
|
|
||||||
return 0, 0, &BlobUploadNotFoundError{Location: location}
|
|
||||||
case response.StatusCode >= 400 && response.StatusCode < 500:
|
|
||||||
var errs v2.Errors
|
|
||||||
decoder := json.NewDecoder(response.Body)
|
|
||||||
err = decoder.Decode(&errs)
|
|
||||||
if err != nil {
|
|
||||||
return 0, 0, err
|
|
||||||
}
|
|
||||||
return 0, 0, &errs
|
|
||||||
default:
|
|
||||||
return 0, 0, &UnexpectedHTTPStatusError{Status: response.Status}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (r *clientImpl) UploadBlob(location string, blob io.ReadCloser, length int, dgst digest.Digest) error {
|
|
||||||
defer blob.Close()
|
|
||||||
|
|
||||||
putRequest, err := http.NewRequest("PUT", location, blob)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
values := putRequest.URL.Query()
|
|
||||||
values.Set("digest", dgst.String())
|
|
||||||
putRequest.URL.RawQuery = values.Encode()
|
|
||||||
|
|
||||||
putRequest.Header.Set("Content-Type", "application/octet-stream")
|
|
||||||
putRequest.Header.Set("Content-Length", fmt.Sprint(length))
|
|
||||||
|
|
||||||
response, err := http.DefaultClient.Do(putRequest)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
defer response.Body.Close()
|
|
||||||
|
|
||||||
// TODO(bbland): handle other status codes, like 5xx errors
|
|
||||||
switch {
|
|
||||||
case response.StatusCode == http.StatusCreated:
|
|
||||||
return nil
|
|
||||||
case response.StatusCode == http.StatusNotFound:
|
|
||||||
return &BlobUploadNotFoundError{Location: location}
|
|
||||||
case response.StatusCode >= 400 && response.StatusCode < 500:
|
|
||||||
var errs v2.Errors
|
|
||||||
decoder := json.NewDecoder(response.Body)
|
|
||||||
err = decoder.Decode(&errs)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
return &errs
|
|
||||||
default:
|
|
||||||
return &UnexpectedHTTPStatusError{Status: response.Status}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (r *clientImpl) UploadBlobChunk(location string, blobChunk io.ReadCloser, length, startByte int) error {
|
|
||||||
defer blobChunk.Close()
|
|
||||||
|
|
||||||
putRequest, err := http.NewRequest("PUT", location, blobChunk)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
endByte := startByte + length
|
|
||||||
|
|
||||||
putRequest.Header.Set("Content-Type", "application/octet-stream")
|
|
||||||
putRequest.Header.Set("Content-Length", fmt.Sprint(length))
|
|
||||||
putRequest.Header.Set("Content-Range",
|
|
||||||
fmt.Sprintf("%d-%d/%d", startByte, endByte, endByte))
|
|
||||||
|
|
||||||
response, err := http.DefaultClient.Do(putRequest)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
defer response.Body.Close()
|
|
||||||
|
|
||||||
// TODO(bbland): handle other status codes, like 5xx errors
|
|
||||||
switch {
|
|
||||||
case response.StatusCode == http.StatusAccepted:
|
|
||||||
return nil
|
|
||||||
case response.StatusCode == http.StatusRequestedRangeNotSatisfiable:
|
|
||||||
lastValidRange, blobSize, err := parseRangeHeader(response.Header.Get("Range"))
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
return &BlobUploadInvalidRangeError{
|
|
||||||
Location: location,
|
|
||||||
LastValidRange: lastValidRange,
|
|
||||||
BlobSize: blobSize,
|
|
||||||
}
|
|
||||||
case response.StatusCode == http.StatusNotFound:
|
|
||||||
return &BlobUploadNotFoundError{Location: location}
|
|
||||||
case response.StatusCode >= 400 && response.StatusCode < 500:
|
|
||||||
var errs v2.Errors
|
|
||||||
decoder := json.NewDecoder(response.Body)
|
|
||||||
err = decoder.Decode(&errs)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
return &errs
|
|
||||||
default:
|
|
||||||
return &UnexpectedHTTPStatusError{Status: response.Status}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (r *clientImpl) FinishChunkedBlobUpload(location string, length int, dgst digest.Digest) error {
|
|
||||||
putRequest, err := http.NewRequest("PUT", location, nil)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
values := putRequest.URL.Query()
|
|
||||||
values.Set("digest", dgst.String())
|
|
||||||
putRequest.URL.RawQuery = values.Encode()
|
|
||||||
|
|
||||||
putRequest.Header.Set("Content-Type", "application/octet-stream")
|
|
||||||
putRequest.Header.Set("Content-Length", "0")
|
|
||||||
putRequest.Header.Set("Content-Range",
|
|
||||||
fmt.Sprintf("%d-%d/%d", length, length, length))
|
|
||||||
|
|
||||||
response, err := http.DefaultClient.Do(putRequest)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
defer response.Body.Close()
|
|
||||||
|
|
||||||
// TODO(bbland): handle other status codes, like 5xx errors
|
|
||||||
switch {
|
|
||||||
case response.StatusCode == http.StatusCreated:
|
|
||||||
return nil
|
|
||||||
case response.StatusCode == http.StatusNotFound:
|
|
||||||
return &BlobUploadNotFoundError{Location: location}
|
|
||||||
case response.StatusCode >= 400 && response.StatusCode < 500:
|
|
||||||
var errs v2.Errors
|
|
||||||
decoder := json.NewDecoder(response.Body)
|
|
||||||
err = decoder.Decode(&errs)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
return &errs
|
|
||||||
default:
|
|
||||||
return &UnexpectedHTTPStatusError{Status: response.Status}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (r *clientImpl) CancelBlobUpload(location string) error {
|
|
||||||
deleteRequest, err := http.NewRequest("DELETE", location, nil)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
response, err := http.DefaultClient.Do(deleteRequest)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
defer response.Body.Close()
|
|
||||||
|
|
||||||
// TODO(bbland): handle other status codes, like 5xx errors
|
|
||||||
switch {
|
|
||||||
case response.StatusCode == http.StatusNoContent:
|
|
||||||
return nil
|
|
||||||
case response.StatusCode == http.StatusNotFound:
|
|
||||||
return &BlobUploadNotFoundError{Location: location}
|
|
||||||
case response.StatusCode >= 400 && response.StatusCode < 500:
|
|
||||||
var errs v2.Errors
|
|
||||||
decoder := json.NewDecoder(response.Body)
|
|
||||||
err = decoder.Decode(&errs)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
return &errs
|
|
||||||
default:
|
|
||||||
return &UnexpectedHTTPStatusError{Status: response.Status}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// parseRangeHeader parses out the offset and length from a returned Range
|
|
||||||
// header
|
|
||||||
func parseRangeHeader(byteRangeHeader string) (int, int, error) {
|
|
||||||
submatches := patternRangeHeader.FindStringSubmatch(byteRangeHeader)
|
|
||||||
if submatches == nil || len(submatches) < 3 {
|
|
||||||
return 0, 0, fmt.Errorf("Malformed Range header")
|
|
||||||
}
|
|
||||||
|
|
||||||
offset, err := strconv.Atoi(submatches[1])
|
|
||||||
if err != nil {
|
|
||||||
return 0, 0, err
|
|
||||||
}
|
|
||||||
length, err := strconv.Atoi(submatches[2])
|
|
||||||
if err != nil {
|
|
||||||
return 0, 0, err
|
|
||||||
}
|
|
||||||
return offset, length, nil
|
|
||||||
}
|
|
|
@ -1,440 +0,0 @@
|
||||||
package client
|
|
||||||
|
|
||||||
import (
|
|
||||||
"encoding/json"
|
|
||||||
"fmt"
|
|
||||||
"io/ioutil"
|
|
||||||
"net/http"
|
|
||||||
"net/http/httptest"
|
|
||||||
"sync"
|
|
||||||
"testing"
|
|
||||||
|
|
||||||
"github.com/docker/distribution/digest"
|
|
||||||
"github.com/docker/distribution/manifest"
|
|
||||||
"github.com/docker/distribution/testutil"
|
|
||||||
)
|
|
||||||
|
|
||||||
type testBlob struct {
|
|
||||||
digest digest.Digest
|
|
||||||
contents []byte
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestRangeHeaderParser(t *testing.T) {
|
|
||||||
const (
|
|
||||||
malformedRangeHeader = "bytes=0-A/C"
|
|
||||||
emptyRangeHeader = ""
|
|
||||||
rFirst = 100
|
|
||||||
rSecond = 200
|
|
||||||
)
|
|
||||||
|
|
||||||
var (
|
|
||||||
wellformedRangeHeader = fmt.Sprintf("bytes=0-%d/%d", rFirst, rSecond)
|
|
||||||
)
|
|
||||||
|
|
||||||
if _, _, err := parseRangeHeader(malformedRangeHeader); err == nil {
|
|
||||||
t.Fatalf("malformedRangeHeader: error expected, got nil")
|
|
||||||
}
|
|
||||||
|
|
||||||
if _, _, err := parseRangeHeader(emptyRangeHeader); err == nil {
|
|
||||||
t.Fatalf("emptyRangeHeader: error expected, got nil")
|
|
||||||
}
|
|
||||||
|
|
||||||
first, second, err := parseRangeHeader(wellformedRangeHeader)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("wellformedRangeHeader: unexpected error %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
if first != rFirst || second != rSecond {
|
|
||||||
t.Fatalf("Range has been parsed unproperly: %d/%d", first, second)
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestPush(t *testing.T) {
|
|
||||||
name := "hello/world"
|
|
||||||
tag := "sometag"
|
|
||||||
testBlobs := []testBlob{
|
|
||||||
{
|
|
||||||
digest: "tarsum.v2+sha256:12345",
|
|
||||||
contents: []byte("some contents"),
|
|
||||||
},
|
|
||||||
{
|
|
||||||
digest: "tarsum.v2+sha256:98765",
|
|
||||||
contents: []byte("some other contents"),
|
|
||||||
},
|
|
||||||
}
|
|
||||||
uploadLocations := make([]string, len(testBlobs))
|
|
||||||
blobs := make([]manifest.FSLayer, len(testBlobs))
|
|
||||||
history := make([]manifest.History, len(testBlobs))
|
|
||||||
|
|
||||||
for i, blob := range testBlobs {
|
|
||||||
// TODO(bbland): this is returning the same location for all uploads,
|
|
||||||
// because we can't know which blob will get which location.
|
|
||||||
// It's sort of okay because we're using unique digests, but this needs
|
|
||||||
// to change at some point.
|
|
||||||
uploadLocations[i] = fmt.Sprintf("/v2/%s/blobs/test-uuid", name)
|
|
||||||
blobs[i] = manifest.FSLayer{BlobSum: blob.digest}
|
|
||||||
history[i] = manifest.History{V1Compatibility: blob.digest.String()}
|
|
||||||
}
|
|
||||||
|
|
||||||
m := &manifest.SignedManifest{
|
|
||||||
Manifest: manifest.Manifest{
|
|
||||||
Name: name,
|
|
||||||
Tag: tag,
|
|
||||||
Architecture: "x86",
|
|
||||||
FSLayers: blobs,
|
|
||||||
History: history,
|
|
||||||
Versioned: manifest.Versioned{
|
|
||||||
SchemaVersion: 1,
|
|
||||||
},
|
|
||||||
},
|
|
||||||
}
|
|
||||||
var err error
|
|
||||||
m.Raw, err = json.Marshal(m)
|
|
||||||
|
|
||||||
blobRequestResponseMappings := make([]testutil.RequestResponseMapping, 2*len(testBlobs))
|
|
||||||
for i, blob := range testBlobs {
|
|
||||||
blobRequestResponseMappings[2*i] = testutil.RequestResponseMapping{
|
|
||||||
Request: testutil.Request{
|
|
||||||
Method: "POST",
|
|
||||||
Route: "/v2/" + name + "/blobs/uploads/",
|
|
||||||
},
|
|
||||||
Response: testutil.Response{
|
|
||||||
StatusCode: http.StatusAccepted,
|
|
||||||
Headers: http.Header(map[string][]string{
|
|
||||||
"Location": {uploadLocations[i]},
|
|
||||||
}),
|
|
||||||
},
|
|
||||||
}
|
|
||||||
blobRequestResponseMappings[2*i+1] = testutil.RequestResponseMapping{
|
|
||||||
Request: testutil.Request{
|
|
||||||
Method: "PUT",
|
|
||||||
Route: uploadLocations[i],
|
|
||||||
QueryParams: map[string][]string{
|
|
||||||
"digest": {blob.digest.String()},
|
|
||||||
},
|
|
||||||
Body: blob.contents,
|
|
||||||
},
|
|
||||||
Response: testutil.Response{
|
|
||||||
StatusCode: http.StatusCreated,
|
|
||||||
},
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
handler := testutil.NewHandler(append(blobRequestResponseMappings, testutil.RequestResponseMapping{
|
|
||||||
Request: testutil.Request{
|
|
||||||
Method: "PUT",
|
|
||||||
Route: "/v2/" + name + "/manifests/" + tag,
|
|
||||||
Body: m.Raw,
|
|
||||||
},
|
|
||||||
Response: testutil.Response{
|
|
||||||
StatusCode: http.StatusOK,
|
|
||||||
},
|
|
||||||
}))
|
|
||||||
var server *httptest.Server
|
|
||||||
|
|
||||||
// HACK(stevvooe): Super hack to follow: the request response map approach
|
|
||||||
// above does not let us correctly format the location header to the
|
|
||||||
// server url. This handler intercepts and re-writes the location header
|
|
||||||
// to the server url.
|
|
||||||
|
|
||||||
hack := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
|
||||||
w = &headerInterceptingResponseWriter{ResponseWriter: w, serverURL: server.URL}
|
|
||||||
handler.ServeHTTP(w, r)
|
|
||||||
})
|
|
||||||
|
|
||||||
server = httptest.NewServer(hack)
|
|
||||||
client, err := New(server.URL)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("error creating client: %v", err)
|
|
||||||
}
|
|
||||||
objectStore := &memoryObjectStore{
|
|
||||||
mutex: new(sync.Mutex),
|
|
||||||
manifestStorage: make(map[string]*manifest.SignedManifest),
|
|
||||||
layerStorage: make(map[digest.Digest]Layer),
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, blob := range testBlobs {
|
|
||||||
l, err := objectStore.Layer(blob.digest)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
writer, err := l.Writer()
|
|
||||||
if err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
writer.SetSize(len(blob.contents))
|
|
||||||
writer.Write(blob.contents)
|
|
||||||
writer.Close()
|
|
||||||
}
|
|
||||||
|
|
||||||
objectStore.WriteManifest(name, tag, m)
|
|
||||||
|
|
||||||
err = Push(client, objectStore, name, tag)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestPull(t *testing.T) {
|
|
||||||
name := "hello/world"
|
|
||||||
tag := "sometag"
|
|
||||||
testBlobs := []testBlob{
|
|
||||||
{
|
|
||||||
digest: "tarsum.v2+sha256:12345",
|
|
||||||
contents: []byte("some contents"),
|
|
||||||
},
|
|
||||||
{
|
|
||||||
digest: "tarsum.v2+sha256:98765",
|
|
||||||
contents: []byte("some other contents"),
|
|
||||||
},
|
|
||||||
}
|
|
||||||
blobs := make([]manifest.FSLayer, len(testBlobs))
|
|
||||||
history := make([]manifest.History, len(testBlobs))
|
|
||||||
|
|
||||||
for i, blob := range testBlobs {
|
|
||||||
blobs[i] = manifest.FSLayer{BlobSum: blob.digest}
|
|
||||||
history[i] = manifest.History{V1Compatibility: blob.digest.String()}
|
|
||||||
}
|
|
||||||
|
|
||||||
m := &manifest.SignedManifest{
|
|
||||||
Manifest: manifest.Manifest{
|
|
||||||
Name: name,
|
|
||||||
Tag: tag,
|
|
||||||
Architecture: "x86",
|
|
||||||
FSLayers: blobs,
|
|
||||||
History: history,
|
|
||||||
Versioned: manifest.Versioned{
|
|
||||||
SchemaVersion: 1,
|
|
||||||
},
|
|
||||||
},
|
|
||||||
}
|
|
||||||
manifestBytes, err := json.Marshal(m)
|
|
||||||
|
|
||||||
blobRequestResponseMappings := make([]testutil.RequestResponseMapping, len(testBlobs))
|
|
||||||
for i, blob := range testBlobs {
|
|
||||||
blobRequestResponseMappings[i] = testutil.RequestResponseMapping{
|
|
||||||
Request: testutil.Request{
|
|
||||||
Method: "GET",
|
|
||||||
Route: "/v2/" + name + "/blobs/" + blob.digest.String(),
|
|
||||||
},
|
|
||||||
Response: testutil.Response{
|
|
||||||
StatusCode: http.StatusOK,
|
|
||||||
Body: blob.contents,
|
|
||||||
},
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
handler := testutil.NewHandler(append(blobRequestResponseMappings, testutil.RequestResponseMapping{
|
|
||||||
Request: testutil.Request{
|
|
||||||
Method: "GET",
|
|
||||||
Route: "/v2/" + name + "/manifests/" + tag,
|
|
||||||
},
|
|
||||||
Response: testutil.Response{
|
|
||||||
StatusCode: http.StatusOK,
|
|
||||||
Body: manifestBytes,
|
|
||||||
},
|
|
||||||
}))
|
|
||||||
server := httptest.NewServer(handler)
|
|
||||||
client, err := New(server.URL)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("error creating client: %v", err)
|
|
||||||
}
|
|
||||||
objectStore := &memoryObjectStore{
|
|
||||||
mutex: new(sync.Mutex),
|
|
||||||
manifestStorage: make(map[string]*manifest.SignedManifest),
|
|
||||||
layerStorage: make(map[digest.Digest]Layer),
|
|
||||||
}
|
|
||||||
|
|
||||||
err = Pull(client, objectStore, name, tag)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
m, err = objectStore.Manifest(name, tag)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
mBytes, err := json.Marshal(m)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
if string(mBytes) != string(manifestBytes) {
|
|
||||||
t.Fatal("Incorrect manifest")
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, blob := range testBlobs {
|
|
||||||
l, err := objectStore.Layer(blob.digest)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
reader, err := l.Reader()
|
|
||||||
if err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
defer reader.Close()
|
|
||||||
|
|
||||||
blobBytes, err := ioutil.ReadAll(reader)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
if string(blobBytes) != string(blob.contents) {
|
|
||||||
t.Fatal("Incorrect blob")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestPullResume(t *testing.T) {
|
|
||||||
name := "hello/world"
|
|
||||||
tag := "sometag"
|
|
||||||
testBlobs := []testBlob{
|
|
||||||
{
|
|
||||||
digest: "tarsum.v2+sha256:12345",
|
|
||||||
contents: []byte("some contents"),
|
|
||||||
},
|
|
||||||
{
|
|
||||||
digest: "tarsum.v2+sha256:98765",
|
|
||||||
contents: []byte("some other contents"),
|
|
||||||
},
|
|
||||||
}
|
|
||||||
layers := make([]manifest.FSLayer, len(testBlobs))
|
|
||||||
history := make([]manifest.History, len(testBlobs))
|
|
||||||
|
|
||||||
for i, layer := range testBlobs {
|
|
||||||
layers[i] = manifest.FSLayer{BlobSum: layer.digest}
|
|
||||||
history[i] = manifest.History{V1Compatibility: layer.digest.String()}
|
|
||||||
}
|
|
||||||
|
|
||||||
m := &manifest.Manifest{
|
|
||||||
Name: name,
|
|
||||||
Tag: tag,
|
|
||||||
Architecture: "x86",
|
|
||||||
FSLayers: layers,
|
|
||||||
History: history,
|
|
||||||
Versioned: manifest.Versioned{
|
|
||||||
SchemaVersion: 1,
|
|
||||||
},
|
|
||||||
}
|
|
||||||
manifestBytes, err := json.Marshal(m)
|
|
||||||
|
|
||||||
layerRequestResponseMappings := make([]testutil.RequestResponseMapping, 2*len(testBlobs))
|
|
||||||
for i, blob := range testBlobs {
|
|
||||||
layerRequestResponseMappings[2*i] = testutil.RequestResponseMapping{
|
|
||||||
Request: testutil.Request{
|
|
||||||
Method: "GET",
|
|
||||||
Route: "/v2/" + name + "/blobs/" + blob.digest.String(),
|
|
||||||
},
|
|
||||||
Response: testutil.Response{
|
|
||||||
StatusCode: http.StatusOK,
|
|
||||||
Body: blob.contents[:len(blob.contents)/2],
|
|
||||||
Headers: http.Header(map[string][]string{
|
|
||||||
"Content-Length": {fmt.Sprint(len(blob.contents))},
|
|
||||||
}),
|
|
||||||
},
|
|
||||||
}
|
|
||||||
layerRequestResponseMappings[2*i+1] = testutil.RequestResponseMapping{
|
|
||||||
Request: testutil.Request{
|
|
||||||
Method: "GET",
|
|
||||||
Route: "/v2/" + name + "/blobs/" + blob.digest.String(),
|
|
||||||
},
|
|
||||||
Response: testutil.Response{
|
|
||||||
StatusCode: http.StatusOK,
|
|
||||||
Body: blob.contents[len(blob.contents)/2:],
|
|
||||||
},
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
for i := 0; i < 3; i++ {
|
|
||||||
layerRequestResponseMappings = append(layerRequestResponseMappings, testutil.RequestResponseMapping{
|
|
||||||
Request: testutil.Request{
|
|
||||||
Method: "GET",
|
|
||||||
Route: "/v2/" + name + "/manifests/" + tag,
|
|
||||||
},
|
|
||||||
Response: testutil.Response{
|
|
||||||
StatusCode: http.StatusOK,
|
|
||||||
Body: manifestBytes,
|
|
||||||
},
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
handler := testutil.NewHandler(layerRequestResponseMappings)
|
|
||||||
server := httptest.NewServer(handler)
|
|
||||||
client, err := New(server.URL)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("error creating client: %v", err)
|
|
||||||
}
|
|
||||||
objectStore := &memoryObjectStore{
|
|
||||||
mutex: new(sync.Mutex),
|
|
||||||
manifestStorage: make(map[string]*manifest.SignedManifest),
|
|
||||||
layerStorage: make(map[digest.Digest]Layer),
|
|
||||||
}
|
|
||||||
|
|
||||||
for attempts := 0; attempts < 3; attempts++ {
|
|
||||||
err = Pull(client, objectStore, name, tag)
|
|
||||||
if err == nil {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
sm, err := objectStore.Manifest(name, tag)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
mBytes, err := json.Marshal(sm)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
if string(mBytes) != string(manifestBytes) {
|
|
||||||
t.Fatal("Incorrect manifest")
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, blob := range testBlobs {
|
|
||||||
l, err := objectStore.Layer(blob.digest)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
reader, err := l.Reader()
|
|
||||||
if err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
defer reader.Close()
|
|
||||||
|
|
||||||
layerBytes, err := ioutil.ReadAll(reader)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
if string(layerBytes) != string(blob.contents) {
|
|
||||||
t.Fatal("Incorrect blob")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// headerInterceptingResponseWriter is a hacky workaround to re-write the
|
|
||||||
// location header to have the server url.
|
|
||||||
type headerInterceptingResponseWriter struct {
|
|
||||||
http.ResponseWriter
|
|
||||||
serverURL string
|
|
||||||
}
|
|
||||||
|
|
||||||
func (hirw *headerInterceptingResponseWriter) WriteHeader(status int) {
|
|
||||||
location := hirw.Header().Get("Location")
|
|
||||||
if location != "" {
|
|
||||||
hirw.Header().Set("Location", hirw.serverURL+location)
|
|
||||||
}
|
|
||||||
|
|
||||||
hirw.ResponseWriter.WriteHeader(status)
|
|
||||||
}
|
|
|
@ -1,239 +0,0 @@
|
||||||
package client
|
|
||||||
|
|
||||||
import (
|
|
||||||
"bytes"
|
|
||||||
"fmt"
|
|
||||||
"io"
|
|
||||||
"sync"
|
|
||||||
|
|
||||||
"github.com/docker/distribution/digest"
|
|
||||||
"github.com/docker/distribution/manifest"
|
|
||||||
)
|
|
||||||
|
|
||||||
var (
|
|
||||||
// ErrLayerAlreadyExists is returned when attempting to create a layer with
|
|
||||||
// a tarsum that is already in use.
|
|
||||||
ErrLayerAlreadyExists = fmt.Errorf("Layer already exists")
|
|
||||||
|
|
||||||
// ErrLayerLocked is returned when attempting to write to a layer which is
|
|
||||||
// currently being written to.
|
|
||||||
ErrLayerLocked = fmt.Errorf("Layer locked")
|
|
||||||
)
|
|
||||||
|
|
||||||
// ObjectStore is an interface which is designed to approximate the docker
|
|
||||||
// engine storage. This interface is subject to change to conform to the
|
|
||||||
// future requirements of the engine.
|
|
||||||
type ObjectStore interface {
|
|
||||||
// Manifest retrieves the image manifest stored at the given repository name
|
|
||||||
// and tag
|
|
||||||
Manifest(name, tag string) (*manifest.SignedManifest, error)
|
|
||||||
|
|
||||||
// WriteManifest stores an image manifest at the given repository name and
|
|
||||||
// tag
|
|
||||||
WriteManifest(name, tag string, manifest *manifest.SignedManifest) error
|
|
||||||
|
|
||||||
// Layer returns a handle to a layer for reading and writing
|
|
||||||
Layer(dgst digest.Digest) (Layer, error)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Layer is a generic image layer interface.
|
|
||||||
// A Layer may not be written to if it is already complete.
|
|
||||||
type Layer interface {
|
|
||||||
// Reader returns a LayerReader or an error if the layer has not been
|
|
||||||
// written to or is currently being written to.
|
|
||||||
Reader() (LayerReader, error)
|
|
||||||
|
|
||||||
// Writer returns a LayerWriter or an error if the layer has been fully
|
|
||||||
// written to or is currently being written to.
|
|
||||||
Writer() (LayerWriter, error)
|
|
||||||
|
|
||||||
// Wait blocks until the Layer can be read from.
|
|
||||||
Wait() error
|
|
||||||
}
|
|
||||||
|
|
||||||
// LayerReader is a read-only handle to a Layer, which exposes the CurrentSize
|
|
||||||
// and full Size in addition to implementing the io.ReadCloser interface.
|
|
||||||
type LayerReader interface {
|
|
||||||
io.ReadCloser
|
|
||||||
|
|
||||||
// CurrentSize returns the number of bytes written to the underlying Layer
|
|
||||||
CurrentSize() int
|
|
||||||
|
|
||||||
// Size returns the full size of the underlying Layer
|
|
||||||
Size() int
|
|
||||||
}
|
|
||||||
|
|
||||||
// LayerWriter is a write-only handle to a Layer, which exposes the CurrentSize
|
|
||||||
// and full Size in addition to implementing the io.WriteCloser interface.
|
|
||||||
// SetSize must be called on this LayerWriter before it can be written to.
|
|
||||||
type LayerWriter interface {
|
|
||||||
io.WriteCloser
|
|
||||||
|
|
||||||
// CurrentSize returns the number of bytes written to the underlying Layer
|
|
||||||
CurrentSize() int
|
|
||||||
|
|
||||||
// Size returns the full size of the underlying Layer
|
|
||||||
Size() int
|
|
||||||
|
|
||||||
// SetSize sets the full size of the underlying Layer.
|
|
||||||
// This must be called before any calls to Write
|
|
||||||
SetSize(int) error
|
|
||||||
}
|
|
||||||
|
|
||||||
// memoryObjectStore is an in-memory implementation of the ObjectStore interface
|
|
||||||
type memoryObjectStore struct {
|
|
||||||
mutex *sync.Mutex
|
|
||||||
manifestStorage map[string]*manifest.SignedManifest
|
|
||||||
layerStorage map[digest.Digest]Layer
|
|
||||||
}
|
|
||||||
|
|
||||||
func (objStore *memoryObjectStore) Manifest(name, tag string) (*manifest.SignedManifest, error) {
|
|
||||||
objStore.mutex.Lock()
|
|
||||||
defer objStore.mutex.Unlock()
|
|
||||||
|
|
||||||
manifest, ok := objStore.manifestStorage[name+":"+tag]
|
|
||||||
if !ok {
|
|
||||||
return nil, fmt.Errorf("No manifest found with Name: %q, Tag: %q", name, tag)
|
|
||||||
}
|
|
||||||
return manifest, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (objStore *memoryObjectStore) WriteManifest(name, tag string, manifest *manifest.SignedManifest) error {
|
|
||||||
objStore.mutex.Lock()
|
|
||||||
defer objStore.mutex.Unlock()
|
|
||||||
|
|
||||||
objStore.manifestStorage[name+":"+tag] = manifest
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (objStore *memoryObjectStore) Layer(dgst digest.Digest) (Layer, error) {
|
|
||||||
objStore.mutex.Lock()
|
|
||||||
defer objStore.mutex.Unlock()
|
|
||||||
|
|
||||||
layer, ok := objStore.layerStorage[dgst]
|
|
||||||
if !ok {
|
|
||||||
layer = &memoryLayer{cond: sync.NewCond(new(sync.Mutex))}
|
|
||||||
objStore.layerStorage[dgst] = layer
|
|
||||||
}
|
|
||||||
|
|
||||||
return layer, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
type memoryLayer struct {
|
|
||||||
cond *sync.Cond
|
|
||||||
contents []byte
|
|
||||||
expectedSize int
|
|
||||||
writing bool
|
|
||||||
}
|
|
||||||
|
|
||||||
func (ml *memoryLayer) Reader() (LayerReader, error) {
|
|
||||||
ml.cond.L.Lock()
|
|
||||||
defer ml.cond.L.Unlock()
|
|
||||||
|
|
||||||
if ml.contents == nil {
|
|
||||||
return nil, fmt.Errorf("Layer has not been written to yet")
|
|
||||||
}
|
|
||||||
if ml.writing {
|
|
||||||
return nil, ErrLayerLocked
|
|
||||||
}
|
|
||||||
|
|
||||||
return &memoryLayerReader{ml: ml, reader: bytes.NewReader(ml.contents)}, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (ml *memoryLayer) Writer() (LayerWriter, error) {
|
|
||||||
ml.cond.L.Lock()
|
|
||||||
defer ml.cond.L.Unlock()
|
|
||||||
|
|
||||||
if ml.contents != nil {
|
|
||||||
if ml.writing {
|
|
||||||
return nil, ErrLayerLocked
|
|
||||||
}
|
|
||||||
if ml.expectedSize == len(ml.contents) {
|
|
||||||
return nil, ErrLayerAlreadyExists
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
ml.contents = make([]byte, 0)
|
|
||||||
}
|
|
||||||
|
|
||||||
ml.writing = true
|
|
||||||
return &memoryLayerWriter{ml: ml, buffer: bytes.NewBuffer(ml.contents)}, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (ml *memoryLayer) Wait() error {
|
|
||||||
ml.cond.L.Lock()
|
|
||||||
defer ml.cond.L.Unlock()
|
|
||||||
|
|
||||||
if ml.contents == nil {
|
|
||||||
return fmt.Errorf("No writer to wait on")
|
|
||||||
}
|
|
||||||
|
|
||||||
for ml.writing {
|
|
||||||
ml.cond.Wait()
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
type memoryLayerReader struct {
|
|
||||||
ml *memoryLayer
|
|
||||||
reader *bytes.Reader
|
|
||||||
}
|
|
||||||
|
|
||||||
func (mlr *memoryLayerReader) Read(p []byte) (int, error) {
|
|
||||||
return mlr.reader.Read(p)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (mlr *memoryLayerReader) Close() error {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (mlr *memoryLayerReader) CurrentSize() int {
|
|
||||||
return len(mlr.ml.contents)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (mlr *memoryLayerReader) Size() int {
|
|
||||||
return mlr.ml.expectedSize
|
|
||||||
}
|
|
||||||
|
|
||||||
type memoryLayerWriter struct {
|
|
||||||
ml *memoryLayer
|
|
||||||
buffer *bytes.Buffer
|
|
||||||
}
|
|
||||||
|
|
||||||
func (mlw *memoryLayerWriter) Write(p []byte) (int, error) {
|
|
||||||
if mlw.ml.expectedSize == 0 {
|
|
||||||
return 0, fmt.Errorf("Must set size before writing to layer")
|
|
||||||
}
|
|
||||||
wrote, err := mlw.buffer.Write(p)
|
|
||||||
mlw.ml.contents = mlw.buffer.Bytes()
|
|
||||||
return wrote, err
|
|
||||||
}
|
|
||||||
|
|
||||||
func (mlw *memoryLayerWriter) Close() error {
|
|
||||||
mlw.ml.cond.L.Lock()
|
|
||||||
defer mlw.ml.cond.L.Unlock()
|
|
||||||
|
|
||||||
return mlw.close()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (mlw *memoryLayerWriter) close() error {
|
|
||||||
mlw.ml.writing = false
|
|
||||||
mlw.ml.cond.Broadcast()
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (mlw *memoryLayerWriter) CurrentSize() int {
|
|
||||||
return len(mlw.ml.contents)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (mlw *memoryLayerWriter) Size() int {
|
|
||||||
return mlw.ml.expectedSize
|
|
||||||
}
|
|
||||||
|
|
||||||
func (mlw *memoryLayerWriter) SetSize(size int) error {
|
|
||||||
if !mlw.ml.writing {
|
|
||||||
return fmt.Errorf("Layer is closed for writing")
|
|
||||||
}
|
|
||||||
mlw.ml.expectedSize = size
|
|
||||||
return nil
|
|
||||||
}
|
|
|
@ -1,151 +0,0 @@
|
||||||
package client
|
|
||||||
|
|
||||||
import (
|
|
||||||
"fmt"
|
|
||||||
"io"
|
|
||||||
|
|
||||||
log "github.com/Sirupsen/logrus"
|
|
||||||
|
|
||||||
"github.com/docker/distribution/manifest"
|
|
||||||
)
|
|
||||||
|
|
||||||
// simultaneousLayerPullWindow is the size of the parallel layer pull window.
|
|
||||||
// A layer may not be pulled until the layer preceeding it by the length of the
|
|
||||||
// pull window has been successfully pulled.
|
|
||||||
const simultaneousLayerPullWindow = 4
|
|
||||||
|
|
||||||
// Pull implements a client pull workflow for the image defined by the given
|
|
||||||
// name and tag pair, using the given ObjectStore for local manifest and layer
|
|
||||||
// storage
|
|
||||||
func Pull(c Client, objectStore ObjectStore, name, tag string) error {
|
|
||||||
manifest, err := c.GetImageManifest(name, tag)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
log.WithField("manifest", manifest).Info("Pulled manifest")
|
|
||||||
|
|
||||||
if len(manifest.FSLayers) != len(manifest.History) {
|
|
||||||
return fmt.Errorf("Length of history not equal to number of layers")
|
|
||||||
}
|
|
||||||
if len(manifest.FSLayers) == 0 {
|
|
||||||
return fmt.Errorf("Image has no layers")
|
|
||||||
}
|
|
||||||
|
|
||||||
errChans := make([]chan error, len(manifest.FSLayers))
|
|
||||||
for i := range manifest.FSLayers {
|
|
||||||
errChans[i] = make(chan error)
|
|
||||||
}
|
|
||||||
|
|
||||||
// To avoid leak of goroutines we must notify
|
|
||||||
// pullLayer goroutines about a cancelation,
|
|
||||||
// otherwise they will lock forever.
|
|
||||||
cancelCh := make(chan struct{})
|
|
||||||
|
|
||||||
// Iterate over each layer in the manifest, simultaneously pulling no more
|
|
||||||
// than simultaneousLayerPullWindow layers at a time. If an error is
|
|
||||||
// received from a layer pull, we abort the push.
|
|
||||||
for i := 0; i < len(manifest.FSLayers)+simultaneousLayerPullWindow; i++ {
|
|
||||||
dependentLayer := i - simultaneousLayerPullWindow
|
|
||||||
if dependentLayer >= 0 {
|
|
||||||
err := <-errChans[dependentLayer]
|
|
||||||
if err != nil {
|
|
||||||
log.WithField("error", err).Warn("Pull aborted")
|
|
||||||
close(cancelCh)
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if i < len(manifest.FSLayers) {
|
|
||||||
go func(i int) {
|
|
||||||
select {
|
|
||||||
case errChans[i] <- pullLayer(c, objectStore, name, manifest.FSLayers[i]):
|
|
||||||
case <-cancelCh: // no chance to recv until cancelCh's closed
|
|
||||||
}
|
|
||||||
}(i)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
err = objectStore.WriteManifest(name, tag, manifest)
|
|
||||||
if err != nil {
|
|
||||||
log.WithFields(log.Fields{
|
|
||||||
"error": err,
|
|
||||||
"manifest": manifest,
|
|
||||||
}).Warn("Unable to write image manifest")
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func pullLayer(c Client, objectStore ObjectStore, name string, fsLayer manifest.FSLayer) error {
|
|
||||||
log.WithField("layer", fsLayer).Info("Pulling layer")
|
|
||||||
|
|
||||||
layer, err := objectStore.Layer(fsLayer.BlobSum)
|
|
||||||
if err != nil {
|
|
||||||
log.WithFields(log.Fields{
|
|
||||||
"error": err,
|
|
||||||
"layer": fsLayer,
|
|
||||||
}).Warn("Unable to write local layer")
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
layerWriter, err := layer.Writer()
|
|
||||||
if err == ErrLayerAlreadyExists {
|
|
||||||
log.WithField("layer", fsLayer).Info("Layer already exists")
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
if err == ErrLayerLocked {
|
|
||||||
log.WithField("layer", fsLayer).Info("Layer download in progress, waiting")
|
|
||||||
layer.Wait()
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
if err != nil {
|
|
||||||
log.WithFields(log.Fields{
|
|
||||||
"error": err,
|
|
||||||
"layer": fsLayer,
|
|
||||||
}).Warn("Unable to write local layer")
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
defer layerWriter.Close()
|
|
||||||
|
|
||||||
if layerWriter.CurrentSize() > 0 {
|
|
||||||
log.WithFields(log.Fields{
|
|
||||||
"layer": fsLayer,
|
|
||||||
"currentSize": layerWriter.CurrentSize(),
|
|
||||||
"size": layerWriter.Size(),
|
|
||||||
}).Info("Layer partially downloaded, resuming")
|
|
||||||
}
|
|
||||||
|
|
||||||
layerReader, length, err := c.GetBlob(name, fsLayer.BlobSum, layerWriter.CurrentSize())
|
|
||||||
if err != nil {
|
|
||||||
log.WithFields(log.Fields{
|
|
||||||
"error": err,
|
|
||||||
"layer": fsLayer,
|
|
||||||
}).Warn("Unable to download layer")
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
defer layerReader.Close()
|
|
||||||
|
|
||||||
layerWriter.SetSize(layerWriter.CurrentSize() + length)
|
|
||||||
|
|
||||||
_, err = io.Copy(layerWriter, layerReader)
|
|
||||||
if err != nil {
|
|
||||||
log.WithFields(log.Fields{
|
|
||||||
"error": err,
|
|
||||||
"layer": fsLayer,
|
|
||||||
}).Warn("Unable to download layer")
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
if layerWriter.CurrentSize() != layerWriter.Size() {
|
|
||||||
log.WithFields(log.Fields{
|
|
||||||
"size": layerWriter.Size(),
|
|
||||||
"currentSize": layerWriter.CurrentSize(),
|
|
||||||
"layer": fsLayer,
|
|
||||||
}).Warn("Layer invalid size")
|
|
||||||
return fmt.Errorf(
|
|
||||||
"Wrote incorrect number of bytes for layer %v. Expected %d, Wrote %d",
|
|
||||||
fsLayer, layerWriter.Size(), layerWriter.CurrentSize(),
|
|
||||||
)
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
|
@ -1,137 +0,0 @@
|
||||||
package client
|
|
||||||
|
|
||||||
import (
|
|
||||||
"fmt"
|
|
||||||
|
|
||||||
log "github.com/Sirupsen/logrus"
|
|
||||||
"github.com/docker/distribution/manifest"
|
|
||||||
)
|
|
||||||
|
|
||||||
// simultaneousLayerPushWindow is the size of the parallel layer push window.
|
|
||||||
// A layer may not be pushed until the layer preceeding it by the length of the
|
|
||||||
// push window has been successfully pushed.
|
|
||||||
const simultaneousLayerPushWindow = 4
|
|
||||||
|
|
||||||
type pushFunction func(fsLayer manifest.FSLayer) error
|
|
||||||
|
|
||||||
// Push implements a client push workflow for the image defined by the given
|
|
||||||
// name and tag pair, using the given ObjectStore for local manifest and layer
|
|
||||||
// storage
|
|
||||||
func Push(c Client, objectStore ObjectStore, name, tag string) error {
|
|
||||||
manifest, err := objectStore.Manifest(name, tag)
|
|
||||||
if err != nil {
|
|
||||||
log.WithFields(log.Fields{
|
|
||||||
"error": err,
|
|
||||||
"name": name,
|
|
||||||
"tag": tag,
|
|
||||||
}).Info("No image found")
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
errChans := make([]chan error, len(manifest.FSLayers))
|
|
||||||
for i := range manifest.FSLayers {
|
|
||||||
errChans[i] = make(chan error)
|
|
||||||
}
|
|
||||||
|
|
||||||
cancelCh := make(chan struct{})
|
|
||||||
|
|
||||||
// Iterate over each layer in the manifest, simultaneously pushing no more
|
|
||||||
// than simultaneousLayerPushWindow layers at a time. If an error is
|
|
||||||
// received from a layer push, we abort the push.
|
|
||||||
for i := 0; i < len(manifest.FSLayers)+simultaneousLayerPushWindow; i++ {
|
|
||||||
dependentLayer := i - simultaneousLayerPushWindow
|
|
||||||
if dependentLayer >= 0 {
|
|
||||||
err := <-errChans[dependentLayer]
|
|
||||||
if err != nil {
|
|
||||||
log.WithField("error", err).Warn("Push aborted")
|
|
||||||
close(cancelCh)
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if i < len(manifest.FSLayers) {
|
|
||||||
go func(i int) {
|
|
||||||
select {
|
|
||||||
case errChans[i] <- pushLayer(c, objectStore, name, manifest.FSLayers[i]):
|
|
||||||
case <-cancelCh: // recv broadcast notification about cancelation
|
|
||||||
}
|
|
||||||
}(i)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
err = c.PutImageManifest(name, tag, manifest)
|
|
||||||
if err != nil {
|
|
||||||
log.WithFields(log.Fields{
|
|
||||||
"error": err,
|
|
||||||
"manifest": manifest,
|
|
||||||
}).Warn("Unable to upload manifest")
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func pushLayer(c Client, objectStore ObjectStore, name string, fsLayer manifest.FSLayer) error {
|
|
||||||
log.WithField("layer", fsLayer).Info("Pushing layer")
|
|
||||||
|
|
||||||
layer, err := objectStore.Layer(fsLayer.BlobSum)
|
|
||||||
if err != nil {
|
|
||||||
log.WithFields(log.Fields{
|
|
||||||
"error": err,
|
|
||||||
"layer": fsLayer,
|
|
||||||
}).Warn("Unable to read local layer")
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
layerReader, err := layer.Reader()
|
|
||||||
if err != nil {
|
|
||||||
log.WithFields(log.Fields{
|
|
||||||
"error": err,
|
|
||||||
"layer": fsLayer,
|
|
||||||
}).Warn("Unable to read local layer")
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
defer layerReader.Close()
|
|
||||||
|
|
||||||
if layerReader.CurrentSize() != layerReader.Size() {
|
|
||||||
log.WithFields(log.Fields{
|
|
||||||
"layer": fsLayer,
|
|
||||||
"currentSize": layerReader.CurrentSize(),
|
|
||||||
"size": layerReader.Size(),
|
|
||||||
}).Warn("Local layer incomplete")
|
|
||||||
return fmt.Errorf("Local layer incomplete")
|
|
||||||
}
|
|
||||||
|
|
||||||
length, err := c.BlobLength(name, fsLayer.BlobSum)
|
|
||||||
if err != nil {
|
|
||||||
log.WithFields(log.Fields{
|
|
||||||
"error": err,
|
|
||||||
"layer": fsLayer,
|
|
||||||
}).Warn("Unable to check existence of remote layer")
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
if length >= 0 {
|
|
||||||
log.WithField("layer", fsLayer).Info("Layer already exists")
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
location, err := c.InitiateBlobUpload(name)
|
|
||||||
if err != nil {
|
|
||||||
log.WithFields(log.Fields{
|
|
||||||
"error": err,
|
|
||||||
"layer": fsLayer,
|
|
||||||
}).Warn("Unable to upload layer")
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
err = c.UploadBlob(location, layerReader, int(layerReader.CurrentSize()), fsLayer.BlobSum)
|
|
||||||
if err != nil {
|
|
||||||
log.WithFields(log.Fields{
|
|
||||||
"error": err,
|
|
||||||
"layer": fsLayer,
|
|
||||||
}).Warn("Unable to upload layer")
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
Loading…
Reference in a new issue