forked from TrueCloudLab/distribution
Refactoring cloudfactory layer handler into a more generic storage
middleware concept. This also breaks the dependency the storage package had on goamz Signed-off-by: David Lawrence <david.lawrence@docker.com> (github: endophage)
This commit is contained in:
parent
4ae9583092
commit
952f39edff
9 changed files with 139 additions and 238 deletions
|
@ -16,6 +16,7 @@ import (
|
||||||
"github.com/docker/distribution/registry/storage"
|
"github.com/docker/distribution/registry/storage"
|
||||||
storagedriver "github.com/docker/distribution/registry/storage/driver"
|
storagedriver "github.com/docker/distribution/registry/storage/driver"
|
||||||
"github.com/docker/distribution/registry/storage/driver/factory"
|
"github.com/docker/distribution/registry/storage/driver/factory"
|
||||||
|
storagemiddleware "github.com/docker/distribution/registry/storage/driver/middleware"
|
||||||
"github.com/gorilla/mux"
|
"github.com/gorilla/mux"
|
||||||
"golang.org/x/net/context"
|
"golang.org/x/net/context"
|
||||||
)
|
)
|
||||||
|
@ -41,8 +42,6 @@ type App struct {
|
||||||
sink notifications.Sink
|
sink notifications.Sink
|
||||||
source notifications.SourceRecord
|
source notifications.SourceRecord
|
||||||
}
|
}
|
||||||
|
|
||||||
layerHandler storage.LayerHandler // allows dispatch of layer serving to external provider
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Value intercepts calls context.Context.Value, returning the current app id,
|
// Value intercepts calls context.Context.Value, returning the current app id,
|
||||||
|
@ -101,14 +100,22 @@ func NewApp(ctx context.Context, configuration configuration.Configuration) *App
|
||||||
app.accessController = accessController
|
app.accessController = accessController
|
||||||
}
|
}
|
||||||
|
|
||||||
layerHandlerType := configuration.LayerHandler.Type()
|
for _, mw := range configuration.Middleware {
|
||||||
|
if mw.Inject == "registry" {
|
||||||
|
// registry middleware can director wrap app.registry identically to storage middlewares with driver
|
||||||
|
panic(fmt.Sprintf("unable to configure registry middleware (%s): %v", mw.Name, err))
|
||||||
|
} else if mw.Inject == "repository" {
|
||||||
|
// we have to do something more intelligent with repository middleware, It needs to be staged
|
||||||
|
// for later to be wrapped around the repository at request time.
|
||||||
|
panic(fmt.Sprintf("unable to configure repository middleware (%s): %v", mw.Name, err))
|
||||||
|
} else if mw.Inject == "storage" {
|
||||||
|
smw, err := storagemiddleware.GetStorageMiddleware(mw.Name, mw.Options, app.driver)
|
||||||
|
|
||||||
if layerHandlerType != "" {
|
|
||||||
lh, err := storage.GetLayerHandler(layerHandlerType, configuration.LayerHandler.Parameters(), app.driver)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
panic(fmt.Sprintf("unable to configure layer handler (%s): %v", layerHandlerType, err))
|
panic(fmt.Sprintf("unable to configure storage middleware (%s): %v", mw.Name, err))
|
||||||
|
}
|
||||||
|
app.driver = smw
|
||||||
}
|
}
|
||||||
app.layerHandler = lh
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return app
|
return app
|
||||||
|
|
|
@ -49,8 +49,8 @@ type layerHandler struct {
|
||||||
// response.
|
// response.
|
||||||
func (lh *layerHandler) GetLayer(w http.ResponseWriter, r *http.Request) {
|
func (lh *layerHandler) GetLayer(w http.ResponseWriter, r *http.Request) {
|
||||||
ctxu.GetLogger(lh).Debug("GetImageLayer")
|
ctxu.GetLogger(lh).Debug("GetImageLayer")
|
||||||
layers := lh.Repository.Layers()
|
layerStore := lh.Repository.Layers()
|
||||||
layer, err := layers.Fetch(lh.Digest)
|
layerReader, err := layerStore.Fetch(lh.Digest)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
switch err := err.(type) {
|
switch err := err.(type) {
|
||||||
|
@ -62,17 +62,6 @@ func (lh *layerHandler) GetLayer(w http.ResponseWriter, r *http.Request) {
|
||||||
}
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
defer layer.Close()
|
|
||||||
|
|
||||||
w.Header().Set("Docker-Content-Digest", lh.Digest.String())
|
layerReader.ServeHTTP(w, r)
|
||||||
|
|
||||||
if lh.layerHandler != nil {
|
|
||||||
handler, _ := lh.layerHandler.Resolve(layer)
|
|
||||||
if handler != nil {
|
|
||||||
handler.ServeHTTP(w, r)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
http.ServeContent(w, r, layer.Digest().String(), layer.CreatedAt(), layer)
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,95 +0,0 @@
|
||||||
package storage
|
|
||||||
|
|
||||||
import (
|
|
||||||
"fmt"
|
|
||||||
"net/http"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/docker/distribution"
|
|
||||||
storagedriver "github.com/docker/distribution/registry/storage/driver"
|
|
||||||
)
|
|
||||||
|
|
||||||
// delegateLayerHandler provides a simple implementation of layerHandler that
|
|
||||||
// simply issues HTTP Temporary Redirects to the URL provided by the
|
|
||||||
// storagedriver for a given Layer.
|
|
||||||
type delegateLayerHandler struct {
|
|
||||||
storageDriver storagedriver.StorageDriver
|
|
||||||
pathMapper *pathMapper
|
|
||||||
duration time.Duration
|
|
||||||
}
|
|
||||||
|
|
||||||
var _ LayerHandler = &delegateLayerHandler{}
|
|
||||||
|
|
||||||
func newDelegateLayerHandler(storageDriver storagedriver.StorageDriver, options map[string]interface{}) (LayerHandler, error) {
|
|
||||||
duration := 20 * time.Minute
|
|
||||||
d, ok := options["duration"]
|
|
||||||
if ok {
|
|
||||||
switch d := d.(type) {
|
|
||||||
case time.Duration:
|
|
||||||
duration = d
|
|
||||||
case string:
|
|
||||||
dur, err := time.ParseDuration(d)
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("Invalid duration: %s", err)
|
|
||||||
}
|
|
||||||
duration = dur
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return &delegateLayerHandler{storageDriver: storageDriver, pathMapper: defaultPathMapper, duration: duration}, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Resolve returns an http.Handler which can serve the contents of the given
|
|
||||||
// Layer, or an error if not supported by the storagedriver.
|
|
||||||
func (lh *delegateLayerHandler) Resolve(layer distribution.Layer) (http.Handler, error) {
|
|
||||||
// TODO(bbland): This is just a sanity check to ensure that the
|
|
||||||
// storagedriver supports url generation. It would be nice if we didn't have
|
|
||||||
// to do this twice for non-GET requests.
|
|
||||||
layerURL, err := lh.urlFor(layer, map[string]interface{}{"method": "GET"})
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
|
||||||
if r.Method != "GET" {
|
|
||||||
layerURL, err = lh.urlFor(layer, map[string]interface{}{"method": r.Method})
|
|
||||||
if err != nil {
|
|
||||||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
http.Redirect(w, r, layerURL, http.StatusTemporaryRedirect)
|
|
||||||
}), nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// urlFor returns a download URL for the given layer, or the empty string if
|
|
||||||
// unsupported.
|
|
||||||
func (lh *delegateLayerHandler) urlFor(layer distribution.Layer, options map[string]interface{}) (string, error) {
|
|
||||||
// Crack open the layer to get at the layerStore
|
|
||||||
layerRd, ok := layer.(*layerReader)
|
|
||||||
if !ok {
|
|
||||||
// TODO(stevvooe): We probably want to find a better way to get at the
|
|
||||||
// underlying filesystem path for a given layer. Perhaps, the layer
|
|
||||||
// handler should have its own layer store but right now, it is not
|
|
||||||
// request scoped.
|
|
||||||
return "", fmt.Errorf("unsupported layer type: cannot resolve blob path: %v", layer)
|
|
||||||
}
|
|
||||||
|
|
||||||
if options == nil {
|
|
||||||
options = make(map[string]interface{})
|
|
||||||
}
|
|
||||||
options["expiry"] = time.Now().Add(lh.duration)
|
|
||||||
|
|
||||||
layerURL, err := lh.storageDriver.URLFor(layerRd.path, options)
|
|
||||||
if err != nil {
|
|
||||||
return "", err
|
|
||||||
}
|
|
||||||
|
|
||||||
return layerURL, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// init registers the delegate layerHandler backend.
|
|
||||||
func init() {
|
|
||||||
RegisterLayerHandler("delegate", LayerHandlerInitFunc(newDelegateLayerHandler))
|
|
||||||
}
|
|
|
@ -1,34 +1,36 @@
|
||||||
package storage
|
// Package middleware - cloudfront wrapper for storage libs
|
||||||
|
// N.B. currently only works with S3, not arbitrary sites
|
||||||
|
//
|
||||||
|
package middleware
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"crypto/x509"
|
"crypto/x509"
|
||||||
"encoding/pem"
|
"encoding/pem"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"net/http"
|
|
||||||
"net/url"
|
"net/url"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/AdRoll/goamz/cloudfront"
|
"github.com/AdRoll/goamz/cloudfront"
|
||||||
"github.com/docker/distribution"
|
|
||||||
storagedriver "github.com/docker/distribution/registry/storage/driver"
|
storagedriver "github.com/docker/distribution/registry/storage/driver"
|
||||||
|
storagemiddleware "github.com/docker/distribution/registry/storage/driver/middleware"
|
||||||
)
|
)
|
||||||
|
|
||||||
// cloudFrontLayerHandler provides an simple implementation of layerHandler that
|
// cloudFrontStorageMiddleware provides an simple implementation of layerHandler that
|
||||||
// constructs temporary signed CloudFront URLs from the storagedriver layer URL,
|
// constructs temporary signed CloudFront URLs from the storagedriver layer URL,
|
||||||
// then issues HTTP Temporary Redirects to this CloudFront content URL.
|
// then issues HTTP Temporary Redirects to this CloudFront content URL.
|
||||||
type cloudFrontLayerHandler struct {
|
type cloudFrontStorageMiddleware struct {
|
||||||
|
storagedriver.StorageDriver
|
||||||
cloudfront *cloudfront.CloudFront
|
cloudfront *cloudfront.CloudFront
|
||||||
delegateLayerHandler *delegateLayerHandler
|
|
||||||
duration time.Duration
|
duration time.Duration
|
||||||
}
|
}
|
||||||
|
|
||||||
var _ LayerHandler = &cloudFrontLayerHandler{}
|
var _ storagedriver.StorageDriver = &cloudFrontStorageMiddleware{}
|
||||||
|
|
||||||
// newCloudFrontLayerHandler constructs and returns a new CloudFront
|
// newCloudFrontLayerHandler constructs and returns a new CloudFront
|
||||||
// LayerHandler implementation.
|
// LayerHandler implementation.
|
||||||
// Required options: baseurl, privatekey, keypairid
|
// Required options: baseurl, privatekey, keypairid
|
||||||
func newCloudFrontLayerHandler(storageDriver storagedriver.StorageDriver, options map[string]interface{}) (LayerHandler, error) {
|
func newCloudFrontStorageMiddleware(storageDriver storagedriver.StorageDriver, options map[string]interface{}) (storagedriver.StorageDriver, error) {
|
||||||
base, ok := options["baseurl"]
|
base, ok := options["baseurl"]
|
||||||
if !ok {
|
if !ok {
|
||||||
return nil, fmt.Errorf("No baseurl provided")
|
return nil, fmt.Errorf("No baseurl provided")
|
||||||
|
@ -68,12 +70,6 @@ func newCloudFrontLayerHandler(storageDriver storagedriver.StorageDriver, option
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
lh, err := newDelegateLayerHandler(storageDriver, options)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
dlh := lh.(*delegateLayerHandler)
|
|
||||||
|
|
||||||
cf := cloudfront.New(baseURL, privateKey, keypairID)
|
cf := cloudfront.New(baseURL, privateKey, keypairID)
|
||||||
|
|
||||||
duration := 20 * time.Minute
|
duration := 20 * time.Minute
|
||||||
|
@ -91,33 +87,33 @@ func newCloudFrontLayerHandler(storageDriver storagedriver.StorageDriver, option
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return &cloudFrontLayerHandler{cloudfront: cf, delegateLayerHandler: dlh, duration: duration}, nil
|
return &cloudFrontStorageMiddleware{StorageDriver: storageDriver, cloudfront: cf, duration: duration}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Resolve returns an http.Handler which can serve the contents of the given
|
// Resolve returns an http.Handler which can serve the contents of the given
|
||||||
// Layer, or an error if not supported by the storagedriver.
|
// Layer, or an error if not supported by the storagedriver.
|
||||||
func (lh *cloudFrontLayerHandler) Resolve(layer distribution.Layer) (http.Handler, error) {
|
func (lh *cloudFrontStorageMiddleware) URLFor(path string, options map[string]interface{}) (string, error) {
|
||||||
layerURLStr, err := lh.delegateLayerHandler.urlFor(layer, nil)
|
// TODO(endophage): currently only supports S3
|
||||||
|
options["expiry"] = time.Now().Add(lh.duration)
|
||||||
|
|
||||||
|
layerURLStr, err := lh.StorageDriver.URLFor(path, options)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return "", err
|
||||||
}
|
}
|
||||||
|
|
||||||
layerURL, err := url.Parse(layerURLStr)
|
layerURL, err := url.Parse(layerURLStr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return "", err
|
||||||
}
|
}
|
||||||
|
|
||||||
cfURL, err := lh.cloudfront.CannedSignedURL(layerURL.Path, "", time.Now().Add(lh.duration))
|
cfURL, err := lh.cloudfront.CannedSignedURL(layerURL.Path, "", time.Now().Add(lh.duration))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return "", err
|
||||||
}
|
}
|
||||||
|
return cfURL, nil
|
||||||
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
|
||||||
http.Redirect(w, r, cfURL, http.StatusTemporaryRedirect)
|
|
||||||
}), nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// init registers the cloudfront layerHandler backend.
|
// init registers the cloudfront layerHandler backend.
|
||||||
func init() {
|
func init() {
|
||||||
RegisterLayerHandler("cloudfront", LayerHandlerInitFunc(newCloudFrontLayerHandler))
|
storagemiddleware.RegisterStorageMiddleware("cloudfront", storagemiddleware.InitFunc(newCloudFrontStorageMiddleware))
|
||||||
}
|
}
|
40
docs/storage/driver/middleware/storagemiddleware.go
Normal file
40
docs/storage/driver/middleware/storagemiddleware.go
Normal file
|
@ -0,0 +1,40 @@
|
||||||
|
package storagemiddleware
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
|
||||||
|
storagedriver "github.com/docker/distribution/registry/storage/driver"
|
||||||
|
)
|
||||||
|
|
||||||
|
// InitFunc is the type of a StorageMiddleware factory function and is
|
||||||
|
// used to register the contsructor for different StorageMiddleware backends.
|
||||||
|
type InitFunc func(storageDriver storagedriver.StorageDriver, options map[string]interface{}) (storagedriver.StorageDriver, error)
|
||||||
|
|
||||||
|
var storageMiddlewares map[string]InitFunc
|
||||||
|
|
||||||
|
// RegisterStorageMiddleware is used to register an StorageMiddlewareInitFunc for
|
||||||
|
// a StorageMiddleware backend with the given name.
|
||||||
|
func RegisterStorageMiddleware(name string, initFunc InitFunc) error {
|
||||||
|
if storageMiddlewares == nil {
|
||||||
|
storageMiddlewares = make(map[string]InitFunc)
|
||||||
|
}
|
||||||
|
if _, exists := storageMiddlewares[name]; exists {
|
||||||
|
return fmt.Errorf("name already registered: %s", name)
|
||||||
|
}
|
||||||
|
|
||||||
|
storageMiddlewares[name] = initFunc
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetStorageMiddleware constructs a StorageMiddleware
|
||||||
|
// with the given options using the named backend.
|
||||||
|
func GetStorageMiddleware(name string, options map[string]interface{}, storageDriver storagedriver.StorageDriver) (storagedriver.StorageDriver, error) {
|
||||||
|
if storageMiddlewares != nil {
|
||||||
|
if initFunc, exists := storageMiddlewares[name]; exists {
|
||||||
|
return initFunc(storageDriver, options)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil, fmt.Errorf("no storage middleware registered with name: %s", name)
|
||||||
|
}
|
|
@ -1,51 +0,0 @@
|
||||||
package storage
|
|
||||||
|
|
||||||
import (
|
|
||||||
"fmt"
|
|
||||||
"net/http"
|
|
||||||
|
|
||||||
"github.com/docker/distribution"
|
|
||||||
storagedriver "github.com/docker/distribution/registry/storage/driver"
|
|
||||||
)
|
|
||||||
|
|
||||||
// LayerHandler provides middleware for serving the contents of a Layer.
|
|
||||||
type LayerHandler interface {
|
|
||||||
// Resolve returns an http.Handler which can serve the contents of a given
|
|
||||||
// Layer if possible, or nil and an error when unsupported. This may
|
|
||||||
// directly serve the contents of the layer or issue a redirect to another
|
|
||||||
// URL hosting the content.
|
|
||||||
Resolve(layer distribution.Layer) (http.Handler, error)
|
|
||||||
}
|
|
||||||
|
|
||||||
// LayerHandlerInitFunc is the type of a LayerHandler factory function and is
|
|
||||||
// used to register the contsructor for different LayerHandler backends.
|
|
||||||
type LayerHandlerInitFunc func(storageDriver storagedriver.StorageDriver, options map[string]interface{}) (LayerHandler, error)
|
|
||||||
|
|
||||||
var layerHandlers map[string]LayerHandlerInitFunc
|
|
||||||
|
|
||||||
// RegisterLayerHandler is used to register an LayerHandlerInitFunc for
|
|
||||||
// a LayerHandler backend with the given name.
|
|
||||||
func RegisterLayerHandler(name string, initFunc LayerHandlerInitFunc) error {
|
|
||||||
if layerHandlers == nil {
|
|
||||||
layerHandlers = make(map[string]LayerHandlerInitFunc)
|
|
||||||
}
|
|
||||||
if _, exists := layerHandlers[name]; exists {
|
|
||||||
return fmt.Errorf("name already registered: %s", name)
|
|
||||||
}
|
|
||||||
|
|
||||||
layerHandlers[name] = initFunc
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// GetLayerHandler constructs a LayerHandler
|
|
||||||
// with the given options using the named backend.
|
|
||||||
func GetLayerHandler(name string, options map[string]interface{}, storageDriver storagedriver.StorageDriver) (LayerHandler, error) {
|
|
||||||
if layerHandlers != nil {
|
|
||||||
if initFunc, exists := layerHandlers[name]; exists {
|
|
||||||
return initFunc(storageDriver, options)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil, fmt.Errorf("no layer handler registered with name: %s", name)
|
|
||||||
}
|
|
|
@ -1,13 +1,14 @@
|
||||||
package storage
|
package storage
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"net/http"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/docker/distribution"
|
"github.com/docker/distribution"
|
||||||
"github.com/docker/distribution/digest"
|
"github.com/docker/distribution/digest"
|
||||||
)
|
)
|
||||||
|
|
||||||
// layerReadSeeker implements Layer and provides facilities for reading and
|
// LayerRead implements Layer and provides facilities for reading and
|
||||||
// seeking.
|
// seeking.
|
||||||
type layerReader struct {
|
type layerReader struct {
|
||||||
fileReader
|
fileReader
|
||||||
|
@ -17,6 +18,10 @@ type layerReader struct {
|
||||||
|
|
||||||
var _ distribution.Layer = &layerReader{}
|
var _ distribution.Layer = &layerReader{}
|
||||||
|
|
||||||
|
func (lrs *layerReader) Path() string {
|
||||||
|
return lrs.path
|
||||||
|
}
|
||||||
|
|
||||||
func (lrs *layerReader) Digest() digest.Digest {
|
func (lrs *layerReader) Digest() digest.Digest {
|
||||||
return lrs.digest
|
return lrs.digest
|
||||||
}
|
}
|
||||||
|
@ -33,3 +38,12 @@ func (lrs *layerReader) CreatedAt() time.Time {
|
||||||
func (lrs *layerReader) Close() error {
|
func (lrs *layerReader) Close() error {
|
||||||
return lrs.closeWithErr(distribution.ErrLayerClosed)
|
return lrs.closeWithErr(distribution.ErrLayerClosed)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (lrs *layerReader) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||||
|
w.Header().Set("Docker-Content-Digest", lrs.digest.String())
|
||||||
|
|
||||||
|
if url, err := lrs.fileReader.driver.URLFor(lrs.Path(), map[string]interface{}{}); err == nil {
|
||||||
|
http.Redirect(w, r, url, http.StatusTemporaryRedirect)
|
||||||
|
}
|
||||||
|
http.ServeContent(w, r, lrs.Digest().String(), lrs.CreatedAt(), lrs)
|
||||||
|
}
|
||||||
|
|
|
@ -138,7 +138,7 @@ func (ls *layerStore) newLayerUpload(uuid, path string, startedAt time.Time) (di
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
return &layerUploadController{
|
return &layerWriter{
|
||||||
layerStore: ls,
|
layerStore: ls,
|
||||||
uuid: uuid,
|
uuid: uuid,
|
||||||
startedAt: startedAt,
|
startedAt: startedAt,
|
||||||
|
|
|
@ -13,9 +13,11 @@ import (
|
||||||
storagedriver "github.com/docker/distribution/registry/storage/driver"
|
storagedriver "github.com/docker/distribution/registry/storage/driver"
|
||||||
)
|
)
|
||||||
|
|
||||||
// layerUploadController is used to control the various aspects of resumable
|
var _ distribution.LayerUpload = &layerWriter{}
|
||||||
|
|
||||||
|
// layerWriter is used to control the various aspects of resumable
|
||||||
// layer upload. It implements the LayerUpload interface.
|
// layer upload. It implements the LayerUpload interface.
|
||||||
type layerUploadController struct {
|
type layerWriter struct {
|
||||||
layerStore *layerStore
|
layerStore *layerStore
|
||||||
|
|
||||||
uuid string
|
uuid string
|
||||||
|
@ -26,65 +28,64 @@ type layerUploadController struct {
|
||||||
bufferedFileWriter
|
bufferedFileWriter
|
||||||
}
|
}
|
||||||
|
|
||||||
var _ distribution.LayerUpload = &layerUploadController{}
|
var _ distribution.LayerUpload = &layerWriter{}
|
||||||
|
|
||||||
// UUID returns the identifier for this upload.
|
// UUID returns the identifier for this upload.
|
||||||
func (luc *layerUploadController) UUID() string {
|
func (lw *layerWriter) UUID() string {
|
||||||
return luc.uuid
|
return lw.uuid
|
||||||
}
|
}
|
||||||
|
|
||||||
func (luc *layerUploadController) StartedAt() time.Time {
|
func (lw *layerWriter) StartedAt() time.Time {
|
||||||
return luc.startedAt
|
return lw.startedAt
|
||||||
}
|
}
|
||||||
|
|
||||||
// Finish marks the upload as completed, returning a valid handle to the
|
// Finish marks the upload as completed, returning a valid handle to the
|
||||||
// uploaded layer. The final size and checksum are validated against the
|
// uploaded layer. The final size and checksum are validated against the
|
||||||
// contents of the uploaded layer. The checksum should be provided in the
|
// contents of the uploaded layer. The checksum should be provided in the
|
||||||
// format <algorithm>:<hex digest>.
|
// format <algorithm>:<hex digest>.
|
||||||
func (luc *layerUploadController) Finish(digest digest.Digest) (distribution.Layer, error) {
|
func (lw *layerWriter) Finish(digest digest.Digest) (distribution.Layer, error) {
|
||||||
ctxu.GetLogger(luc.layerStore.repository.ctx).Debug("(*layerUploadController).Finish")
|
ctxu.GetLogger(lw.layerStore.repository.ctx).Debug("(*layerWriter).Finish")
|
||||||
|
|
||||||
err := luc.bufferedFileWriter.Close()
|
if err := lw.bufferedFileWriter.Close(); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
canonical, err := lw.validateLayer(digest)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
canonical, err := luc.validateLayer(digest)
|
if err := lw.moveLayer(canonical); err != nil {
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := luc.moveLayer(canonical); err != nil {
|
|
||||||
// TODO(stevvooe): Cleanup?
|
// TODO(stevvooe): Cleanup?
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
// Link the layer blob into the repository.
|
// Link the layer blob into the repository.
|
||||||
if err := luc.linkLayer(canonical, digest); err != nil {
|
if err := lw.linkLayer(canonical, digest); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := luc.removeResources(); err != nil {
|
if err := lw.removeResources(); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
return luc.layerStore.Fetch(canonical)
|
return lw.layerStore.Fetch(canonical)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Cancel the layer upload process.
|
// Cancel the layer upload process.
|
||||||
func (luc *layerUploadController) Cancel() error {
|
func (lw *layerWriter) Cancel() error {
|
||||||
ctxu.GetLogger(luc.layerStore.repository.ctx).Debug("(*layerUploadController).Cancel")
|
ctxu.GetLogger(lw.layerStore.repository.ctx).Debug("(*layerWriter).Cancel")
|
||||||
if err := luc.removeResources(); err != nil {
|
if err := lw.removeResources(); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
luc.Close()
|
lw.Close()
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// validateLayer checks the layer data against the digest, returning an error
|
// validateLayer checks the layer data against the digest, returning an error
|
||||||
// if it does not match. The canonical digest is returned.
|
// if it does not match. The canonical digest is returned.
|
||||||
func (luc *layerUploadController) validateLayer(dgst digest.Digest) (digest.Digest, error) {
|
func (lw *layerWriter) validateLayer(dgst digest.Digest) (digest.Digest, error) {
|
||||||
digestVerifier, err := digest.NewDigestVerifier(dgst)
|
digestVerifier, err := digest.NewDigestVerifier(dgst)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", err
|
return "", err
|
||||||
|
@ -96,7 +97,7 @@ func (luc *layerUploadController) validateLayer(dgst digest.Digest) (digest.Dige
|
||||||
// then only have to fetch the difference.
|
// then only have to fetch the difference.
|
||||||
|
|
||||||
// Read the file from the backend driver and validate it.
|
// Read the file from the backend driver and validate it.
|
||||||
fr, err := newFileReader(luc.bufferedFileWriter.driver, luc.path)
|
fr, err := newFileReader(lw.bufferedFileWriter.driver, lw.path)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", err
|
return "", err
|
||||||
}
|
}
|
||||||
|
@ -125,8 +126,8 @@ func (luc *layerUploadController) validateLayer(dgst digest.Digest) (digest.Dige
|
||||||
// moveLayer moves the data into its final, hash-qualified destination,
|
// moveLayer moves the data into its final, hash-qualified destination,
|
||||||
// identified by dgst. The layer should be validated before commencing the
|
// identified by dgst. The layer should be validated before commencing the
|
||||||
// move.
|
// move.
|
||||||
func (luc *layerUploadController) moveLayer(dgst digest.Digest) error {
|
func (lw *layerWriter) moveLayer(dgst digest.Digest) error {
|
||||||
blobPath, err := luc.layerStore.repository.registry.pm.path(blobDataPathSpec{
|
blobPath, err := lw.layerStore.repository.registry.pm.path(blobDataPathSpec{
|
||||||
digest: dgst,
|
digest: dgst,
|
||||||
})
|
})
|
||||||
|
|
||||||
|
@ -135,7 +136,7 @@ func (luc *layerUploadController) moveLayer(dgst digest.Digest) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Check for existence
|
// Check for existence
|
||||||
if _, err := luc.driver.Stat(blobPath); err != nil {
|
if _, err := lw.driver.Stat(blobPath); err != nil {
|
||||||
switch err := err.(type) {
|
switch err := err.(type) {
|
||||||
case storagedriver.PathNotFoundError:
|
case storagedriver.PathNotFoundError:
|
||||||
break // ensure that it doesn't exist.
|
break // ensure that it doesn't exist.
|
||||||
|
@ -154,7 +155,7 @@ func (luc *layerUploadController) moveLayer(dgst digest.Digest) error {
|
||||||
// the size here and write a zero-length file to blobPath if this is the
|
// the size here and write a zero-length file to blobPath if this is the
|
||||||
// case. For the most part, this should only ever happen with zero-length
|
// case. For the most part, this should only ever happen with zero-length
|
||||||
// tars.
|
// tars.
|
||||||
if _, err := luc.driver.Stat(luc.path); err != nil {
|
if _, err := lw.driver.Stat(lw.path); err != nil {
|
||||||
switch err := err.(type) {
|
switch err := err.(type) {
|
||||||
case storagedriver.PathNotFoundError:
|
case storagedriver.PathNotFoundError:
|
||||||
// HACK(stevvooe): This is slightly dangerous: if we verify above,
|
// HACK(stevvooe): This is slightly dangerous: if we verify above,
|
||||||
|
@ -163,24 +164,24 @@ func (luc *layerUploadController) moveLayer(dgst digest.Digest) error {
|
||||||
// prevent this horrid thing, we employ the hack of only allowing
|
// prevent this horrid thing, we employ the hack of only allowing
|
||||||
// to this happen for the zero tarsum.
|
// to this happen for the zero tarsum.
|
||||||
if dgst == digest.DigestSha256EmptyTar {
|
if dgst == digest.DigestSha256EmptyTar {
|
||||||
return luc.driver.PutContent(blobPath, []byte{})
|
return lw.driver.PutContent(blobPath, []byte{})
|
||||||
}
|
}
|
||||||
|
|
||||||
// We let this fail during the move below.
|
// We let this fail during the move below.
|
||||||
logrus.
|
logrus.
|
||||||
WithField("upload.uuid", luc.UUID()).
|
WithField("upload.uuid", lw.UUID()).
|
||||||
WithField("digest", dgst).Warnf("attempted to move zero-length content with non-zero digest")
|
WithField("digest", dgst).Warnf("attempted to move zero-length content with non-zero digest")
|
||||||
default:
|
default:
|
||||||
return err // unrelated error
|
return err // unrelated error
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return luc.driver.Move(luc.path, blobPath)
|
return lw.driver.Move(lw.path, blobPath)
|
||||||
}
|
}
|
||||||
|
|
||||||
// linkLayer links a valid, written layer blob into the registry under the
|
// linkLayer links a valid, written layer blob into the registry under the
|
||||||
// named repository for the upload controller.
|
// named repository for the upload controller.
|
||||||
func (luc *layerUploadController) linkLayer(canonical digest.Digest, aliases ...digest.Digest) error {
|
func (lw *layerWriter) linkLayer(canonical digest.Digest, aliases ...digest.Digest) error {
|
||||||
dgsts := append([]digest.Digest{canonical}, aliases...)
|
dgsts := append([]digest.Digest{canonical}, aliases...)
|
||||||
|
|
||||||
// Don't make duplicate links.
|
// Don't make duplicate links.
|
||||||
|
@ -192,8 +193,8 @@ func (luc *layerUploadController) linkLayer(canonical digest.Digest, aliases ...
|
||||||
}
|
}
|
||||||
seenDigests[dgst] = struct{}{}
|
seenDigests[dgst] = struct{}{}
|
||||||
|
|
||||||
layerLinkPath, err := luc.layerStore.repository.registry.pm.path(layerLinkPathSpec{
|
layerLinkPath, err := lw.layerStore.repository.registry.pm.path(layerLinkPathSpec{
|
||||||
name: luc.layerStore.repository.Name(),
|
name: lw.layerStore.repository.Name(),
|
||||||
digest: dgst,
|
digest: dgst,
|
||||||
})
|
})
|
||||||
|
|
||||||
|
@ -201,7 +202,7 @@ func (luc *layerUploadController) linkLayer(canonical digest.Digest, aliases ...
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := luc.layerStore.repository.registry.driver.PutContent(layerLinkPath, []byte(canonical)); err != nil {
|
if err := lw.layerStore.repository.registry.driver.PutContent(layerLinkPath, []byte(canonical)); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -212,10 +213,10 @@ func (luc *layerUploadController) linkLayer(canonical digest.Digest, aliases ...
|
||||||
// removeResources should clean up all resources associated with the upload
|
// removeResources should clean up all resources associated with the upload
|
||||||
// instance. An error will be returned if the clean up cannot proceed. If the
|
// instance. An error will be returned if the clean up cannot proceed. If the
|
||||||
// resources are already not present, no error will be returned.
|
// resources are already not present, no error will be returned.
|
||||||
func (luc *layerUploadController) removeResources() error {
|
func (lw *layerWriter) removeResources() error {
|
||||||
dataPath, err := luc.layerStore.repository.registry.pm.path(uploadDataPathSpec{
|
dataPath, err := lw.layerStore.repository.registry.pm.path(uploadDataPathSpec{
|
||||||
name: luc.layerStore.repository.Name(),
|
name: lw.layerStore.repository.Name(),
|
||||||
uuid: luc.uuid,
|
uuid: lw.uuid,
|
||||||
})
|
})
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -226,7 +227,7 @@ func (luc *layerUploadController) removeResources() error {
|
||||||
// upload related files.
|
// upload related files.
|
||||||
dirPath := path.Dir(dataPath)
|
dirPath := path.Dir(dataPath)
|
||||||
|
|
||||||
if err := luc.driver.Delete(dirPath); err != nil {
|
if err := lw.driver.Delete(dirPath); err != nil {
|
||||||
switch err := err.(type) {
|
switch err := err.(type) {
|
||||||
case storagedriver.PathNotFoundError:
|
case storagedriver.PathNotFoundError:
|
||||||
break // already gone!
|
break // already gone!
|
Loading…
Reference in a new issue