diff --git a/registry/app.go b/registry/app.go index 72ac4f065..6a79cdfab 100644 --- a/registry/app.go +++ b/registry/app.go @@ -29,8 +29,6 @@ type App struct { // services contains the main services instance for the application. services *storage.Services - tokenProvider tokenProvider - layerHandler storage.LayerHandler accessController auth.AccessController @@ -66,8 +64,6 @@ func NewApp(configuration configuration.Configuration) *App { app.driver = driver app.services = storage.NewServices(app.driver) - app.tokenProvider = newHMACTokenProvider(configuration.HTTP.Secret) - authType := configuration.Auth.Type() if authType != "" { diff --git a/registry/hmac.go b/registry/hmac.go new file mode 100644 index 000000000..d24700875 --- /dev/null +++ b/registry/hmac.go @@ -0,0 +1,72 @@ +package registry + +import ( + "crypto/hmac" + "crypto/sha256" + "encoding/base64" + "encoding/json" + "fmt" + "time" +) + +// layerUploadState captures the state serializable state of the layer upload. +type layerUploadState struct { + // name is the primary repository under which the layer will be linked. + Name string + + // UUID identifies the upload. + UUID string + + // offset contains the current progress of the upload. + Offset int64 + + // StartedAt is the original start time of the upload. + StartedAt time.Time +} + +type hmacKey string + +// unpackUploadState unpacks and validates the layer upload state from the +// token, using the hmacKey secret. +func (secret hmacKey) unpackUploadState(token string) (layerUploadState, error) { + var state layerUploadState + + tokenBytes, err := base64.URLEncoding.DecodeString(token) + if err != nil { + return state, err + } + mac := hmac.New(sha256.New, []byte(secret)) + + if len(tokenBytes) < mac.Size() { + return state, fmt.Errorf("Invalid token") + } + + macBytes := tokenBytes[:mac.Size()] + messageBytes := tokenBytes[mac.Size():] + + mac.Write(messageBytes) + if !hmac.Equal(mac.Sum(nil), macBytes) { + return state, fmt.Errorf("Invalid token") + } + + if err := json.Unmarshal(messageBytes, &state); err != nil { + return state, err + } + + return state, nil +} + +// packUploadState packs the upload state signed with and hmac digest using +// the hmacKey secret, encoding to url safe base64. The resulting token can be +// used to share data with minimized risk of external tampering. +func (secret hmacKey) packUploadState(lus layerUploadState) (string, error) { + mac := hmac.New(sha256.New, []byte(secret)) + p, err := json.Marshal(lus) + if err != nil { + return "", err + } + + mac.Write(p) + + return base64.URLEncoding.EncodeToString(append(mac.Sum(nil), p...)), nil +} diff --git a/registry/tokens_test.go b/registry/hmac_test.go similarity index 69% rename from registry/tokens_test.go rename to registry/hmac_test.go index a447438a0..5ad60f61d 100644 --- a/registry/tokens_test.go +++ b/registry/hmac_test.go @@ -1,12 +1,8 @@ package registry -import ( - "testing" +import "testing" - "github.com/docker/distribution/storage" -) - -var layerUploadStates = []storage.LayerUploadState{ +var layerUploadStates = []layerUploadState{ { Name: "hello", UUID: "abcd-1234-qwer-0987", @@ -47,15 +43,15 @@ var secrets = []string{ // TestLayerUploadTokens constructs stateTokens from LayerUploadStates and // validates that the tokens can be used to reconstruct the proper upload state. func TestLayerUploadTokens(t *testing.T) { - tokenProvider := newHMACTokenProvider("supersecret") + secret := hmacKey("supersecret") for _, testcase := range layerUploadStates { - token, err := tokenProvider.layerUploadStateToToken(testcase) + token, err := secret.packUploadState(testcase) if err != nil { t.Fatal(err) } - lus, err := tokenProvider.layerUploadStateFromToken(token) + lus, err := secret.unpackUploadState(token) if err != nil { t.Fatal(err) } @@ -68,39 +64,39 @@ func TestLayerUploadTokens(t *testing.T) { // only if they share the same secret. func TestHMACValidation(t *testing.T) { for _, secret := range secrets { - tokenProvider1 := newHMACTokenProvider(secret) - tokenProvider2 := newHMACTokenProvider(secret) - badTokenProvider := newHMACTokenProvider("DifferentSecret") + secret1 := hmacKey(secret) + secret2 := hmacKey(secret) + badSecret := hmacKey("DifferentSecret") for _, testcase := range layerUploadStates { - token, err := tokenProvider1.layerUploadStateToToken(testcase) + token, err := secret1.packUploadState(testcase) if err != nil { t.Fatal(err) } - lus, err := tokenProvider2.layerUploadStateFromToken(token) + lus, err := secret2.unpackUploadState(token) if err != nil { t.Fatal(err) } assertLayerUploadStateEquals(t, testcase, lus) - _, err = badTokenProvider.layerUploadStateFromToken(token) + _, err = badSecret.unpackUploadState(token) if err == nil { t.Fatalf("Expected token provider to fail at retrieving state from token: %s", token) } - badToken, err := badTokenProvider.layerUploadStateToToken(testcase) + badToken, err := badSecret.packUploadState(lus) if err != nil { t.Fatal(err) } - _, err = tokenProvider1.layerUploadStateFromToken(badToken) + _, err = secret1.unpackUploadState(badToken) if err == nil { t.Fatalf("Expected token provider to fail at retrieving state from token: %s", badToken) } - _, err = tokenProvider2.layerUploadStateFromToken(badToken) + _, err = secret2.unpackUploadState(badToken) if err == nil { t.Fatalf("Expected token provider to fail at retrieving state from token: %s", badToken) } @@ -108,7 +104,7 @@ func TestHMACValidation(t *testing.T) { } } -func assertLayerUploadStateEquals(t *testing.T, expected storage.LayerUploadState, received storage.LayerUploadState) { +func assertLayerUploadStateEquals(t *testing.T, expected layerUploadState, received layerUploadState) { if expected.Name != received.Name { t.Fatalf("Expected Name=%q, Received Name=%q", expected.Name, received.Name) } diff --git a/registry/layerupload.go b/registry/layerupload.go index b694a6773..158bf7b4f 100644 --- a/registry/layerupload.go +++ b/registry/layerupload.go @@ -5,7 +5,7 @@ import ( "io" "net/http" "net/url" - "strconv" + "os" "github.com/Sirupsen/logrus" "github.com/docker/distribution/api/v2" @@ -33,26 +33,57 @@ func layerUploadDispatcher(ctx *Context, r *http.Request) http.Handler { if luh.UUID != "" { luh.log = luh.log.WithField("uuid", luh.UUID) - state, err := ctx.tokenProvider.layerUploadStateFromToken(r.FormValue("_state")) + state, err := hmacKey(ctx.Config.HTTP.Secret).unpackUploadState(r.FormValue("_state")) if err != nil { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - logrus.Infof("error resolving upload: %v", err) - w.WriteHeader(http.StatusInternalServerError) - luh.Errors.Push(v2.ErrorCodeUnknown, err) + ctx.log.Infof("error resolving upload: %v", err) + w.WriteHeader(http.StatusBadRequest) + luh.Errors.Push(v2.ErrorCodeBlobUploadInvalid, err) + }) + } + luh.State = state + + if state.UUID != luh.UUID { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + ctx.log.Infof("mismatched uuid in upload state: %q != %q", state.UUID, luh.UUID) + w.WriteHeader(http.StatusBadRequest) + luh.Errors.Push(v2.ErrorCodeBlobUploadInvalid, err) }) } layers := ctx.services.Layers() - upload, err := layers.Resume(state) + upload, err := layers.Resume(luh.Name, luh.UUID) if err != nil && err != storage.ErrLayerUploadUnknown { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - logrus.Infof("error resolving upload: %v", err) - w.WriteHeader(http.StatusInternalServerError) - luh.Errors.Push(v2.ErrorCodeUnknown, err) + ctx.log.Errorf("error resolving upload: %v", err) + w.WriteHeader(http.StatusBadRequest) + luh.Errors.Push(v2.ErrorCodeBlobUploadUnknown, err) }) } - luh.Upload = upload + + if state.Offset > 0 { + // Seek the layer upload to the correct spot if it's non-zero. + // These error conditions should be rare and demonstrate really + // problems. We basically cancel the upload and tell the client to + // start over. + if nn, err := upload.Seek(luh.State.Offset, os.SEEK_SET); err != nil { + ctx.log.Infof("error seeking layer upload: %v", err) + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusBadRequest) + luh.Errors.Push(v2.ErrorCodeBlobUploadInvalid, err) + upload.Cancel() + }) + } else if nn != luh.State.Offset { + ctx.log.Infof("seek to wrong offest: %d != %d", nn, luh.State.Offset) + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusBadRequest) + luh.Errors.Push(v2.ErrorCodeBlobUploadInvalid, err) + upload.Cancel() + }) + } + } + handler = closeResources(handler, luh.Upload) } @@ -67,6 +98,8 @@ type layerUploadHandler struct { UUID string Upload storage.LayerUpload + + State layerUploadState } // StartLayerUpload begins the layer upload process and allocates a server- @@ -171,14 +204,30 @@ func (luh *layerUploadHandler) CancelLayerUpload(w http.ResponseWriter, r *http. // chunk responses. This sets the correct headers but the response status is // left to the caller. func (luh *layerUploadHandler) layerUploadResponse(w http.ResponseWriter, r *http.Request) error { - values := make(url.Values) - stateToken, err := luh.Context.tokenProvider.layerUploadStateToToken(storage.LayerUploadState{Name: luh.Upload.Name(), UUID: luh.Upload.UUID(), Offset: luh.Upload.Offset()}) + + offset, err := luh.Upload.Seek(0, os.SEEK_CUR) + if err != nil { + luh.log.Errorf("unable get current offset of layer upload: %v", err) + return err + } + + // TODO(stevvooe): Need a better way to manage the upload state automatically. + luh.State.Name = luh.Name + luh.State.UUID = luh.Upload.UUID() + luh.State.Offset = offset + luh.State.StartedAt = luh.Upload.StartedAt() + + token, err := hmacKey(luh.Config.HTTP.Secret).packUploadState(luh.State) if err != nil { logrus.Infof("error building upload state token: %s", err) return err } - values.Set("_state", stateToken) - uploadURL, err := luh.urlBuilder.BuildBlobUploadChunkURL(luh.Upload.Name(), luh.Upload.UUID(), values) + + uploadURL, err := luh.urlBuilder.BuildBlobUploadChunkURL( + luh.Upload.Name(), luh.Upload.UUID(), + url.Values{ + "_state": []string{token}, + }) if err != nil { logrus.Infof("error building upload url: %s", err) return err @@ -186,7 +235,7 @@ func (luh *layerUploadHandler) layerUploadResponse(w http.ResponseWriter, r *htt w.Header().Set("Location", uploadURL) w.Header().Set("Content-Length", "0") - w.Header().Set("Range", fmt.Sprintf("0-%d", luh.Upload.Offset())) + w.Header().Set("Range", fmt.Sprintf("0-%d", luh.State.Offset)) return nil } @@ -198,7 +247,6 @@ var errNotReadyToComplete = fmt.Errorf("not ready to complete upload") func (luh *layerUploadHandler) maybeCompleteUpload(w http.ResponseWriter, r *http.Request) error { // If we get a digest and length, we can finish the upload. dgstStr := r.FormValue("digest") // TODO(stevvooe): Support multiple digest parameters! - sizeStr := r.FormValue("size") if dgstStr == "" { return errNotReadyToComplete @@ -209,23 +257,13 @@ func (luh *layerUploadHandler) maybeCompleteUpload(w http.ResponseWriter, r *htt return err } - var size int64 - if sizeStr != "" { - size, err = strconv.ParseInt(sizeStr, 10, 64) - if err != nil { - return err - } - } else { - size = -1 - } - - luh.completeUpload(w, r, size, dgst) + luh.completeUpload(w, r, dgst) return nil } // completeUpload finishes out the upload with the correct response. -func (luh *layerUploadHandler) completeUpload(w http.ResponseWriter, r *http.Request, size int64, dgst digest.Digest) { - layer, err := luh.Upload.Finish(size, dgst) +func (luh *layerUploadHandler) completeUpload(w http.ResponseWriter, r *http.Request, dgst digest.Digest) { + layer, err := luh.Upload.Finish(dgst) if err != nil { luh.Errors.Push(v2.ErrorCodeUnknown, err) w.WriteHeader(http.StatusInternalServerError) diff --git a/registry/tokens.go b/registry/tokens.go deleted file mode 100644 index 276b896e8..000000000 --- a/registry/tokens.go +++ /dev/null @@ -1,65 +0,0 @@ -package registry - -import ( - "crypto/hmac" - "crypto/sha256" - "encoding/base64" - "encoding/json" - "fmt" - - "github.com/docker/distribution/storage" -) - -// tokenProvider contains methods for serializing and deserializing state from token strings. -type tokenProvider interface { - // layerUploadStateFromToken retrieves the LayerUploadState for a given state token. - layerUploadStateFromToken(stateToken string) (storage.LayerUploadState, error) - - // layerUploadStateToToken returns a token string representing the given LayerUploadState. - layerUploadStateToToken(layerUploadState storage.LayerUploadState) (string, error) -} - -type hmacTokenProvider struct { - secret string -} - -func newHMACTokenProvider(secret string) tokenProvider { - return &hmacTokenProvider{secret: secret} -} - -// layerUploadStateFromToken deserializes the given HMAC stateToken and validates the prefix HMAC -func (ts *hmacTokenProvider) layerUploadStateFromToken(stateToken string) (storage.LayerUploadState, error) { - var lus storage.LayerUploadState - - tokenBytes, err := base64.URLEncoding.DecodeString(stateToken) - if err != nil { - return lus, err - } - mac := hmac.New(sha256.New, []byte(ts.secret)) - - if len(tokenBytes) < mac.Size() { - return lus, fmt.Errorf("Invalid token") - } - - macBytes := tokenBytes[:mac.Size()] - messageBytes := tokenBytes[mac.Size():] - - mac.Write(messageBytes) - if !hmac.Equal(mac.Sum(nil), macBytes) { - return lus, fmt.Errorf("Invalid token") - } - - if err := json.Unmarshal(messageBytes, &lus); err != nil { - return lus, err - } - - return lus, nil -} - -// layerUploadStateToToken serializes the given LayerUploadState to JSON with an HMAC prepended -func (ts *hmacTokenProvider) layerUploadStateToToken(lus storage.LayerUploadState) (string, error) { - mac := hmac.New(sha256.New, []byte(ts.secret)) - stateJSON := fmt.Sprintf("{\"Name\": \"%s\", \"UUID\": \"%s\", \"Offset\": %d}", lus.Name, lus.UUID, lus.Offset) - mac.Write([]byte(stateJSON)) - return base64.URLEncoding.EncodeToString(append(mac.Sum(nil), stateJSON...)), nil -} diff --git a/storage/filewriter.go b/storage/filewriter.go new file mode 100644 index 000000000..cfa7c93de --- /dev/null +++ b/storage/filewriter.go @@ -0,0 +1,153 @@ +package storage + +import ( + "bytes" + "fmt" + "io" + "os" + + "github.com/docker/distribution/storagedriver" +) + +// fileWriter implements a remote file writer backed by a storage driver. +type fileWriter struct { + driver storagedriver.StorageDriver + + // identifying fields + path string + + // mutable fields + size int64 // size of the file, aka the current end + offset int64 // offset is the current write offset + err error // terminal error, if set, reader is closed +} + +// fileWriterInterface makes the desired io compliant interface that the +// filewriter should implement. +type fileWriterInterface interface { + io.WriteSeeker + io.WriterAt + io.ReaderFrom + io.Closer +} + +var _ fileWriterInterface = &fileWriter{} + +// newFileWriter returns a prepared fileWriter for the driver and path. This +// could be considered similar to an "open" call on a regular filesystem. +func newFileWriter(driver storagedriver.StorageDriver, path string) (*fileWriter, error) { + fw := fileWriter{ + driver: driver, + path: path, + } + + if fi, err := driver.Stat(path); err != nil { + switch err := err.(type) { + case storagedriver.PathNotFoundError: + // ignore, offset is zero + default: + return nil, err + } + } else { + if fi.IsDir() { + return nil, fmt.Errorf("cannot write to a directory") + } + + fw.size = fi.Size() + } + + return &fw, nil +} + +// Write writes the buffer p at the current write offset. +func (fw *fileWriter) Write(p []byte) (n int, err error) { + nn, err := fw.readFromAt(bytes.NewReader(p), -1) + return int(nn), err +} + +// WriteAt writes p at the specified offset. The underlying offset does not +// change. +func (fw *fileWriter) WriteAt(p []byte, offset int64) (n int, err error) { + nn, err := fw.readFromAt(bytes.NewReader(p), offset) + return int(nn), err +} + +// ReadFrom reads reader r until io.EOF writing the contents at the current +// offset. +func (fw *fileWriter) ReadFrom(r io.Reader) (n int64, err error) { + return fw.readFromAt(r, -1) +} + +// Seek moves the write position do the requested offest based on the whence +// argument, which can be os.SEEK_CUR, os.SEEK_END, or os.SEEK_SET. +func (fw *fileWriter) Seek(offset int64, whence int) (int64, error) { + if fw.err != nil { + return 0, fw.err + } + + var err error + newOffset := fw.offset + + switch whence { + case os.SEEK_CUR: + newOffset += int64(offset) + case os.SEEK_END: + newOffset = fw.size + int64(offset) + case os.SEEK_SET: + newOffset = int64(offset) + } + + if newOffset < 0 { + err = fmt.Errorf("cannot seek to negative position") + } else if newOffset > fw.size { + fw.offset = newOffset + fw.size = newOffset + } else { + // No problems, set the offset. + fw.offset = newOffset + } + + return fw.offset, err +} + +// Close closes the fileWriter for writing. +func (fw *fileWriter) Close() error { + if fw.err != nil { + return fw.err + } + + fw.err = fmt.Errorf("filewriter@%v: closed", fw.path) + + return fw.err +} + +// readFromAt writes to fw from r at the specified offset. If offset is less +// than zero, the value of fw.offset is used and updated after the operation. +func (fw *fileWriter) readFromAt(r io.Reader, offset int64) (n int64, err error) { + if fw.err != nil { + return 0, fw.err + } + + var updateOffset bool + if offset < 0 { + offset = fw.offset + updateOffset = true + } + + nn, err := fw.driver.WriteStream(fw.path, offset, r) + + if updateOffset { + // We should forward the offset, whether or not there was an error. + // Basically, we keep the filewriter in sync with the reader's head. If an + // error is encountered, the whole thing should be retried but we proceed + // from an expected offset, even if the data didn't make it to the + // backend. + fw.offset += nn + + if fw.offset > fw.size { + fw.size = fw.offset + } + } + + return nn, err +} diff --git a/storage/filewriter_test.go b/storage/filewriter_test.go new file mode 100644 index 000000000..2235462f8 --- /dev/null +++ b/storage/filewriter_test.go @@ -0,0 +1,148 @@ +package storage + +import ( + "bytes" + "crypto/rand" + "io" + "os" + "testing" + + "github.com/docker/distribution/digest" + "github.com/docker/distribution/storagedriver/inmemory" +) + +// TestSimpleWrite takes the fileWriter through common write operations +// ensuring data integrity. +func TestSimpleWrite(t *testing.T) { + content := make([]byte, 1<<20) + n, err := rand.Read(content) + if err != nil { + t.Fatalf("unexpected error building random data: %v", err) + } + + if n != len(content) { + t.Fatalf("random read did't fill buffer") + } + + dgst, err := digest.FromReader(bytes.NewReader(content)) + if err != nil { + t.Fatalf("unexpected error digesting random content: %v", err) + } + + driver := inmemory.New() + path := "/random" + + fw, err := newFileWriter(driver, path) + if err != nil { + t.Fatalf("unexpected error creating fileWriter: %v", err) + } + defer fw.Close() + + n, err = fw.Write(content) + if err != nil { + t.Fatalf("unexpected error writing content: %v", err) + } + + if n != len(content) { + t.Fatalf("unexpected write length: %d != %d", n, len(content)) + } + + fr, err := newFileReader(driver, path) + if err != nil { + t.Fatalf("unexpected error creating fileReader: %v", err) + } + defer fr.Close() + + verifier := digest.NewDigestVerifier(dgst) + io.Copy(verifier, fr) + + if !verifier.Verified() { + t.Fatalf("unable to verify write data") + } + + // Check the seek position is equal to the content length + end, err := fw.Seek(0, os.SEEK_END) + if err != nil { + t.Fatalf("unexpected error seeking: %v", err) + } + + if end != int64(len(content)) { + t.Fatalf("write did not advance offset: %d != %d", end, len(content)) + } + + // Double the content, but use the WriteAt method + doubled := append(content, content...) + doubledgst, err := digest.FromReader(bytes.NewReader(doubled)) + if err != nil { + t.Fatalf("unexpected error digesting doubled content: %v", err) + } + + n, err = fw.WriteAt(content, end) + if err != nil { + t.Fatalf("unexpected error writing content at %d: %v", end, err) + } + + if n != len(content) { + t.Fatalf("writeat was short: %d != %d", n, len(content)) + } + + fr, err = newFileReader(driver, path) + if err != nil { + t.Fatalf("unexpected error creating fileReader: %v", err) + } + defer fr.Close() + + verifier = digest.NewDigestVerifier(doubledgst) + io.Copy(verifier, fr) + + if !verifier.Verified() { + t.Fatalf("unable to verify write data") + } + + // Check that WriteAt didn't update the offset. + end, err = fw.Seek(0, os.SEEK_END) + if err != nil { + t.Fatalf("unexpected error seeking: %v", err) + } + + if end != int64(len(content)) { + t.Fatalf("write did not advance offset: %d != %d", end, len(content)) + } + + // Now, we copy from one path to another, running the data through the + // fileReader to fileWriter, rather than the driver.Move command to ensure + // everything is working correctly. + fr, err = newFileReader(driver, path) + if err != nil { + t.Fatalf("unexpected error creating fileReader: %v", err) + } + defer fr.Close() + + fw, err = newFileWriter(driver, "/copied") + if err != nil { + t.Fatalf("unexpected error creating fileWriter: %v", err) + } + defer fw.Close() + + nn, err := io.Copy(fw, fr) + if err != nil { + t.Fatalf("unexpected error copying data: %v", err) + } + + if nn != int64(len(doubled)) { + t.Fatalf("unexpected copy length: %d != %d", nn, len(doubled)) + } + + fr, err = newFileReader(driver, "/copied") + if err != nil { + t.Fatalf("unexpected error creating fileReader: %v", err) + } + defer fr.Close() + + verifier = digest.NewDigestVerifier(doubledgst) + io.Copy(verifier, fr) + + if !verifier.Verified() { + t.Fatalf("unable to verify write data") + } +} diff --git a/storage/layer.go b/storage/layer.go index ec5f0f9de..24736c701 100644 --- a/storage/layer.go +++ b/storage/layer.go @@ -24,8 +24,7 @@ type Layer interface { // layers. Digest() digest.Digest - // CreatedAt returns the time this layer was created. Until we implement - // Stat call on storagedriver, this just returns the zero time. + // CreatedAt returns the time this layer was created. CreatedAt() time.Time } @@ -33,26 +32,22 @@ type Layer interface { // Instances can be obtained from the LayerService.Upload and // LayerService.Resume. type LayerUpload interface { - io.WriteCloser - - // UUID returns the identifier for this upload. - UUID() string + io.WriteSeeker + io.Closer // Name of the repository under which the layer will be linked. Name() string - // Offset returns the position of the last byte written to this layer. - Offset() int64 + // UUID returns the identifier for this upload. + UUID() string - // TODO(stevvooe): Consider completely removing the size check from this - // interface. The digest check may be adequate and we are making it - // optional in the HTTP API. + // StartedAt returns the time this layer upload was started. + StartedAt() time.Time // Finish marks the upload as completed, returning a valid handle to the - // uploaded layer. The final size and digest are validated against the - // contents of the uploaded layer. If the size is negative, only the - // digest will be checked. - Finish(size int64, digest digest.Digest) (Layer, error) + // uploaded layer. The digest is validated against the contents of the + // uploaded layer. + Finish(digest digest.Digest) (Layer, error) // Cancel the layer upload process. Cancel() error @@ -84,11 +79,11 @@ func (err ErrUnknownLayer) Error() string { // ErrLayerInvalidDigest returned when tarsum check fails. type ErrLayerInvalidDigest struct { - FSLayer manifest.FSLayer + Digest digest.Digest } func (err ErrLayerInvalidDigest) Error() string { - return fmt.Sprintf("invalid digest for referenced layer: %v", err.FSLayer.BlobSum) + return fmt.Sprintf("invalid digest for referenced layer: %v", err.Digest) } // ErrLayerInvalidSize returned when length check fails. diff --git a/storage/layer_test.go b/storage/layer_test.go index ec5b7406a..d6f4718aa 100644 --- a/storage/layer_test.go +++ b/storage/layer_test.go @@ -26,21 +26,18 @@ func TestSimpleLayerUpload(t *testing.T) { dgst := digest.Digest(tarSumStr) - uploadStore, err := newTemporaryLocalFSLayerUploadStore() if err != nil { t.Fatalf("error allocating upload store: %v", err) } imageName := "foo/bar" - driver := inmemory.New() ls := &layerStore{ - driver: driver, + driver: inmemory.New(), pathMapper: &pathMapper{ root: "/storage/testing", version: storagePathVersion, }, - uploadStore: uploadStore, } h := sha256.New() @@ -58,7 +55,7 @@ func TestSimpleLayerUpload(t *testing.T) { } // Do a resume, get unknown upload - layerUpload, err = ls.Resume(LayerUploadState{Name: layerUpload.Name(), UUID: layerUpload.UUID(), Offset: layerUpload.Offset()}) + layerUpload, err = ls.Resume(layerUpload.Name(), layerUpload.UUID()) if err != ErrLayerUploadUnknown { t.Fatalf("unexpected error resuming upload, should be unkown: %v", err) } @@ -84,26 +81,31 @@ func TestSimpleLayerUpload(t *testing.T) { t.Fatalf("layer data write incomplete") } - if layerUpload.Offset() != nn { - t.Fatalf("layerUpload not updated with correct offset: %v != %v", layerUpload.Offset(), nn) + offset, err := layerUpload.Seek(0, os.SEEK_CUR) + if err != nil { + t.Fatalf("unexpected error seeking layer upload: %v", err) + } + + if offset != nn { + t.Fatalf("layerUpload not updated with correct offset: %v != %v", offset, nn) } layerUpload.Close() // Do a resume, for good fun - layerUpload, err = ls.Resume(LayerUploadState{Name: layerUpload.Name(), UUID: layerUpload.UUID(), Offset: layerUpload.Offset()}) + layerUpload, err = ls.Resume(layerUpload.Name(), layerUpload.UUID()) if err != nil { t.Fatalf("unexpected error resuming upload: %v", err) } sha256Digest := digest.NewDigest("sha256", h) - layer, err := layerUpload.Finish(randomDataSize, dgst) + layer, err := layerUpload.Finish(dgst) if err != nil { t.Fatalf("unexpected error finishing layer upload: %v", err) } // After finishing an upload, it should no longer exist. - if _, err := ls.Resume(LayerUploadState{Name: layerUpload.Name(), UUID: layerUpload.UUID(), Offset: layerUpload.Offset()}); err != ErrLayerUploadUnknown { + if _, err := ls.Resume(layerUpload.Name(), layerUpload.UUID()); err != ErrLayerUploadUnknown { t.Fatalf("expected layer upload to be unknown, got %v", err) } diff --git a/storage/layerstore.go b/storage/layerstore.go index f73bef6d2..41227cc5b 100644 --- a/storage/layerstore.go +++ b/storage/layerstore.go @@ -1,15 +1,17 @@ package storage import ( + "time" + + "code.google.com/p/go-uuid/uuid" "github.com/docker/distribution/digest" "github.com/docker/distribution/manifest" "github.com/docker/distribution/storagedriver" ) type layerStore struct { - driver storagedriver.StorageDriver - pathMapper *pathMapper - uploadStore layerUploadStore + driver storagedriver.StorageDriver + pathMapper *pathMapper } func (ls *layerStore) Exists(name string, digest digest.Digest) (bool, error) { @@ -66,31 +68,86 @@ func (ls *layerStore) Upload(name string) (LayerUpload, error) { // the same two layers. Should it be disallowed? For now, we allow both // parties to proceed and the the first one uploads the layer. - lus, err := ls.uploadStore.New(name) + uuid := uuid.New() + startedAt := time.Now().UTC() + + path, err := ls.pathMapper.path(uploadDataPathSpec{ + name: name, + uuid: uuid, + }) + if err != nil { return nil, err } - return ls.newLayerUpload(lus), nil + startedAtPath, err := ls.pathMapper.path(uploadStartedAtPathSpec{ + name: name, + uuid: uuid, + }) + + if err != nil { + return nil, err + } + + // Write a startedat file for this upload + if err := ls.driver.PutContent(startedAtPath, []byte(startedAt.Format(time.RFC3339))); err != nil { + return nil, err + } + + return ls.newLayerUpload(name, uuid, path, startedAt) } // Resume continues an in progress layer upload, returning the current // state of the upload. -func (ls *layerStore) Resume(lus LayerUploadState) (LayerUpload, error) { - _, err := ls.uploadStore.GetState(lus.UUID) +func (ls *layerStore) Resume(name, uuid string) (LayerUpload, error) { + startedAtPath, err := ls.pathMapper.path(uploadStartedAtPathSpec{ + name: name, + uuid: uuid, + }) if err != nil { return nil, err } - return ls.newLayerUpload(lus), nil + startedAtBytes, err := ls.driver.GetContent(startedAtPath) + if err != nil { + switch err := err.(type) { + case storagedriver.PathNotFoundError: + return nil, ErrLayerUploadUnknown + default: + return nil, err + } + } + + startedAt, err := time.Parse(time.RFC3339, string(startedAtBytes)) + if err != nil { + return nil, err + } + + path, err := ls.pathMapper.path(uploadDataPathSpec{ + name: name, + uuid: uuid, + }) + + if err != nil { + return nil, err + } + + return ls.newLayerUpload(name, uuid, path, startedAt) } // newLayerUpload allocates a new upload controller with the given state. -func (ls *layerStore) newLayerUpload(lus LayerUploadState) LayerUpload { - return &layerUploadController{ - LayerUploadState: lus, - layerStore: ls, - uploadStore: ls.uploadStore, +func (ls *layerStore) newLayerUpload(name, uuid, path string, startedAt time.Time) (LayerUpload, error) { + fw, err := newFileWriter(ls.driver, path) + if err != nil { + return nil, err } + + return &layerUploadController{ + layerStore: ls, + name: name, + uuid: uuid, + startedAt: startedAt, + fileWriter: *fw, + }, nil } diff --git a/storage/layerupload.go b/storage/layerupload.go index 3175a09ef..b9953b236 100644 --- a/storage/layerupload.go +++ b/storage/layerupload.go @@ -1,229 +1,84 @@ package storage import ( - "fmt" "io" - "io/ioutil" - "os" - "path/filepath" - - "code.google.com/p/go-uuid/uuid" + "path" + "time" + "github.com/Sirupsen/logrus" "github.com/docker/distribution/digest" - "github.com/docker/distribution/manifest" "github.com/docker/distribution/storagedriver" "github.com/docker/docker/pkg/tarsum" ) -// LayerUploadState captures the state serializable state of the layer upload. -type LayerUploadState struct { - // name is the primary repository under which the layer will be linked. - Name string - - // UUID identifies the upload. - UUID string - - // offset contains the current progress of the upload. - Offset int64 -} - // layerUploadController is used to control the various aspects of resumable // layer upload. It implements the LayerUpload interface. type layerUploadController struct { - LayerUploadState + layerStore *layerStore - layerStore *layerStore - uploadStore layerUploadStore - fp layerFile - err error // terminal error, if set, controller is closed -} + name string + uuid string + startedAt time.Time -// layerFile documents the interface used while writing layer files, similar -// to *os.File. This is separate from layerReader, for now, because we want to -// store uploads on the local file system until we have write-through hashing -// support. They should be combined once this is worked out. -type layerFile interface { - io.WriteSeeker - io.Reader - io.Closer - - // Sync commits the contents of the writer to storage. - Sync() (err error) -} - -// layerUploadStore provides storage for temporary files and upload state of -// layers. This is be used by the LayerService to manage the state of ongoing -// uploads. This interface will definitely change and will most likely end up -// being exported to the app layer. Move the layer.go when it's ready to go. -type layerUploadStore interface { - New(name string) (LayerUploadState, error) - Open(uuid string) (layerFile, error) - GetState(uuid string) (LayerUploadState, error) - // TODO: factor this method back in - // SaveState(lus LayerUploadState) error - DeleteState(uuid string) error + fileWriter } var _ LayerUpload = &layerUploadController{} // Name of the repository under which the layer will be linked. func (luc *layerUploadController) Name() string { - return luc.LayerUploadState.Name + return luc.name } // UUID returns the identifier for this upload. func (luc *layerUploadController) UUID() string { - return luc.LayerUploadState.UUID + return luc.uuid } -// Offset returns the position of the last byte written to this layer. -func (luc *layerUploadController) Offset() int64 { - return luc.LayerUploadState.Offset +func (luc *layerUploadController) StartedAt() time.Time { + return luc.startedAt } // Finish marks the upload as completed, returning a valid handle to the // uploaded layer. The final size and checksum are validated against the // contents of the uploaded layer. The checksum should be provided in the // format :. -func (luc *layerUploadController) Finish(size int64, digest digest.Digest) (Layer, error) { - - // This section is going to be pretty ugly now. We will have to read the - // file twice. First, to get the tarsum and checksum. When those are - // available, and validated, we will upload it to the blob store and link - // it into the repository. In the future, we need to use resumable hash - // calculations for tarsum and checksum that can be calculated during the - // upload. This will allow us to cut the data directly into a temporary - // directory in the storage backend. - - fp, err := luc.file() - - if err != nil { - // Cleanup? - return nil, err - } - - digest, err = luc.validateLayer(fp, size, digest) +func (luc *layerUploadController) Finish(digest digest.Digest) (Layer, error) { + canonical, err := luc.validateLayer(digest) if err != nil { return nil, err } - if nn, err := luc.writeLayer(fp, digest); err != nil { - // Cleanup? - return nil, err - } else if size >= 0 && nn != size { - // TODO(stevvooe): Short write. Will have to delete the location and - // report an error. This error needs to be reported to the client. - return nil, fmt.Errorf("short write writing layer") - } - - // Yes! We have written some layer data. Let's make it visible. Link the - // layer blob into the repository. - if err := luc.linkLayer(digest); err != nil { + if err := luc.moveLayer(canonical); err != nil { + // TODO(stevvooe): Cleanup? return nil, err } - // Ok, the upload has completed and finished. Delete the state. - if err := luc.uploadStore.DeleteState(luc.UUID()); err != nil { - // Can we ignore this error? + // Link the layer blob into the repository. + if err := luc.linkLayer(canonical); err != nil { return nil, err } - return luc.layerStore.Fetch(luc.Name(), digest) + if err := luc.removeResources(); err != nil { + return nil, err + } + + return luc.layerStore.Fetch(luc.Name(), canonical) } // Cancel the layer upload process. func (luc *layerUploadController) Cancel() error { - if err := luc.layerStore.uploadStore.DeleteState(luc.UUID()); err != nil { + if err := luc.removeResources(); err != nil { return err } - return luc.Close() + luc.Close() + return nil } -func (luc *layerUploadController) Write(p []byte) (int, error) { - wr, err := luc.file() - if err != nil { - return 0, err - } - - n, err := wr.Write(p) - - // Because we expect the reported offset to be consistent with the storage - // state, unfortunately, we need to Sync on every call to write. - if err := wr.Sync(); err != nil { - // Effectively, ignore the write state if the Sync fails. Report that - // no bytes were written and seek back to the starting offset. - offset, seekErr := wr.Seek(luc.Offset(), os.SEEK_SET) - if seekErr != nil { - // What do we do here? Quite disasterous. - luc.reset() - - return 0, fmt.Errorf("multiple errors encounterd after Sync + Seek: %v then %v", err, seekErr) - } - - if offset != luc.Offset() { - return 0, fmt.Errorf("unexpected offset after seek") - } - - return 0, err - } - - luc.LayerUploadState.Offset += int64(n) - - return n, err -} - -func (luc *layerUploadController) Close() error { - if luc.err != nil { - return luc.err - } - - if luc.fp != nil { - luc.err = luc.fp.Close() - } - - return luc.err -} - -func (luc *layerUploadController) file() (layerFile, error) { - if luc.fp != nil { - return luc.fp, nil - } - - fp, err := luc.uploadStore.Open(luc.UUID()) - - if err != nil { - return nil, err - } - - // TODO(stevvooe): We may need a more aggressive check here to ensure that - // the file length is equal to the current offset. We may want to sync the - // offset before return the layer upload to the client so it can be - // validated before proceeding with any writes. - - // Seek to the current layer offset for good measure. - if _, err = fp.Seek(luc.Offset(), os.SEEK_SET); err != nil { - return nil, err - } - - luc.fp = fp - - return luc.fp, nil -} - -// reset closes and drops the current writer. -func (luc *layerUploadController) reset() { - if luc.fp != nil { - luc.fp.Close() - luc.fp = nil - } -} - -// validateLayer runs several checks on the layer file to ensure its validity. -// This is currently very expensive and relies on fast io and fast seek on the -// local host. If successful, the latest digest is returned, which should be -// used over the passed in value. -func (luc *layerUploadController) validateLayer(fp layerFile, size int64, dgst digest.Digest) (digest.Digest, error) { +// validateLayer checks the layer data against the digest, returning an error +// if it does not match. The canonical digest is returned. +func (luc *layerUploadController) validateLayer(dgst digest.Digest) (digest.Digest, error) { // First, check the incoming tarsum version of the digest. version, err := tarsum.GetVersionFromTarsum(dgst.String()) if err != nil { @@ -239,87 +94,65 @@ func (luc *layerUploadController) validateLayer(fp layerFile, size int64, dgst d } digestVerifier := digest.NewDigestVerifier(dgst) - lengthVerifier := digest.NewLengthVerifier(size) - // First, seek to the end of the file, checking the size is as expected. - end, err := fp.Seek(0, os.SEEK_END) + // TODO(stevvooe): Store resumable hash calculations in upload directory + // in driver. Something like a file at path /resumablehash/ + // with the hash state up to that point would be perfect. The hasher would + // then only have to fetch the difference. + + // Read the file from the backend driver and validate it. + fr, err := newFileReader(luc.fileWriter.driver, luc.path) if err != nil { return "", err } - // Only check size if it is greater than - if size >= 0 && end != size { - // Fast path length check. - return "", ErrLayerInvalidSize{Size: size} - } - - // Now seek back to start and take care of the digest. - if _, err := fp.Seek(0, os.SEEK_SET); err != nil { - return "", err - } - - tr := io.TeeReader(fp, digestVerifier) - - // Only verify the size if a positive size argument has been passed. - if size >= 0 { - tr = io.TeeReader(tr, lengthVerifier) - } + tr := io.TeeReader(fr, digestVerifier) // TODO(stevvooe): This is one of the places we need a Digester write - // sink. Instead, its read driven. This migth be okay. + // sink. Instead, its read driven. This might be okay. // Calculate an updated digest with the latest version. - dgst, err = digest.FromReader(tr) + canonical, err := digest.FromReader(tr) if err != nil { return "", err } - if size >= 0 && !lengthVerifier.Verified() { - return "", ErrLayerInvalidSize{Size: size} - } - if !digestVerifier.Verified() { - return "", ErrLayerInvalidDigest{manifest.FSLayer{BlobSum: dgst}} + return "", ErrLayerInvalidDigest{Digest: dgst} } - return dgst, nil + return canonical, nil } -// writeLayer actually writes the the layer file into its final destination, +// moveLayer moves the data into its final, hash-qualified destination, // identified by dgst. The layer should be validated before commencing the -// write. -func (luc *layerUploadController) writeLayer(fp layerFile, dgst digest.Digest) (nn int64, err error) { +// move. +func (luc *layerUploadController) moveLayer(dgst digest.Digest) error { blobPath, err := luc.layerStore.pathMapper.path(blobPathSpec{ digest: dgst, }) if err != nil { - return 0, err + return err } // Check for existence if _, err := luc.layerStore.driver.Stat(blobPath); err != nil { - // TODO(stevvooe): This check is kind of problematic and very racy. switch err := err.(type) { case storagedriver.PathNotFoundError: break // ensure that it doesn't exist. default: - // TODO(stevvooe): This isn't actually an error: the blob store is - // content addressable and we should just use this to ensure we - // have it written. Although, we do need to verify that the - // content that is there is the correct length. - return 0, err + return err } + } else { + // If the path exists, we can assume that the content has already + // been uploaded, since the blob storage is content-addressable. + // While it may be corrupted, detection of such corruption belongs + // elsewhere. + return nil } - // Seek our local layer file back now. - if _, err := fp.Seek(0, os.SEEK_SET); err != nil { - // Cleanup? - return 0, err - } - - // Okay: we can write the file to the blob store. - return luc.layerStore.driver.WriteStream(blobPath, 0, fp) + return luc.driver.Move(luc.path, blobPath) } // linkLayer links a valid, written layer blob into the registry under the @@ -337,85 +170,35 @@ func (luc *layerUploadController) linkLayer(digest digest.Digest) error { return luc.layerStore.driver.PutContent(layerLinkPath, []byte(digest)) } -// localFSLayerUploadStore implements a local layerUploadStore. There are some -// complexities around hashsums that make round tripping to the storage -// backend problematic, so we'll store and read locally for now. By GO-beta, -// this should be fully implemented on top of the backend storagedriver. -// -// For now, the directory layout is as follows: -// -// //registry-layer-upload/ -// / -// -> state.json -// -> data -// -// Each upload, identified by uuid, has its own directory with a state file -// and a data file. The state file has a json representation of the current -// state. The data file is the in-progress upload data. -type localFSLayerUploadStore struct { - root string -} - -func newTemporaryLocalFSLayerUploadStore() (layerUploadStore, error) { - path, err := ioutil.TempDir("", "registry-layer-upload") +// removeResources should clean up all resources associated with the upload +// instance. An error will be returned if the clean up cannot proceed. If the +// resources are already not present, no error will be returned. +func (luc *layerUploadController) removeResources() error { + dataPath, err := luc.layerStore.pathMapper.path(uploadDataPathSpec{ + name: luc.name, + uuid: luc.uuid, + }) if err != nil { - return nil, err - } - - return &localFSLayerUploadStore{ - root: path, - }, nil -} - -func (llufs *localFSLayerUploadStore) New(name string) (LayerUploadState, error) { - lus := LayerUploadState{ - Name: name, - UUID: uuid.New(), - } - - if err := os.Mkdir(llufs.path(lus.UUID, ""), 0755); err != nil { - return lus, err - } - - return lus, nil -} - -func (llufs *localFSLayerUploadStore) Open(uuid string) (layerFile, error) { - fp, err := os.OpenFile(llufs.path(uuid, "data"), os.O_CREATE|os.O_APPEND|os.O_RDWR, 0644) - - if err != nil { - return nil, err - } - - return fp, nil -} - -func (llufs *localFSLayerUploadStore) GetState(uuid string) (LayerUploadState, error) { - var lus LayerUploadState - - if _, err := os.Stat(llufs.path(uuid, "")); err != nil { - if os.IsNotExist(err) { - return lus, ErrLayerUploadUnknown - } - - return lus, err - } - return lus, nil -} - -func (llufs *localFSLayerUploadStore) DeleteState(uuid string) error { - if err := os.RemoveAll(llufs.path(uuid, "")); err != nil { - if os.IsNotExist(err) { - return ErrLayerUploadUnknown - } - return err } + // Resolve and delete the containing directory, which should include any + // upload related files. + dirPath := path.Dir(dataPath) + + if err := luc.driver.Delete(dirPath); err != nil { + switch err := err.(type) { + case storagedriver.PathNotFoundError: + break // already gone! + default: + // This should be uncommon enough such that returning an error + // should be okay. At this point, the upload should be mostly + // complete, but perhaps the backend became unaccessible. + logrus.Errorf("unable to delete layer upload resources %q: %v", dirPath, err) + return err + } + } + return nil } - -func (llufs *localFSLayerUploadStore) path(uuid, file string) string { - return filepath.Join(llufs.root, uuid, file) -} diff --git a/storage/manifeststore_test.go b/storage/manifeststore_test.go index 991028e56..a6cca9627 100644 --- a/storage/manifeststore_test.go +++ b/storage/manifeststore_test.go @@ -153,6 +153,6 @@ func (mockedExistenceLayerService) Upload(name string) (LayerUpload, error) { panic("not implemented") } -func (mockedExistenceLayerService) Resume(lus LayerUploadState) (LayerUpload, error) { +func (mockedExistenceLayerService) Resume(name, uuid string) (LayerUpload, error) { panic("not implemented") } diff --git a/storage/paths.go b/storage/paths.go index c5d6c90fc..0724b2865 100644 --- a/storage/paths.go +++ b/storage/paths.go @@ -23,20 +23,26 @@ const storagePathVersion = "v2" // // -> layers/ // +// -> uploads/ +// data +// startedat // -> blob/ // // // There are few important components to this path layout. First, we have the // repository store identified by name. This contains the image manifests and -// a layer store with links to CAS blob ids. Outside of the named repo area, -// we have the the blob store. It contains the actual layer data and any other -// data that can be referenced by a CAS id. +// a layer store with links to CAS blob ids. Upload coordination data is also +// stored here. Outside of the named repo area, we have the the blob store. It +// contains the actual layer data and any other data that can be referenced by +// a CAS id. // // We cover the path formats implemented by this path mapper below. // // manifestPathSpec: /v2/repositories//manifests/ // layerLinkPathSpec: /v2/repositories//layers/tarsum/// // blobPathSpec: /v2/blob/// +// uploadDataPathSpec: /v2/repositories//uploads//data +// uploadStartedAtPathSpec: /v2/repositories//uploads//startedat // // For more information on the semantic meaning of each path and their // contents, please see the path spec documentation. @@ -103,6 +109,10 @@ func (pm *pathMapper) path(spec pathSpec) (string, error) { blobPathPrefix := append(rootPrefix, "blob") return path.Join(append(blobPathPrefix, components...)...), nil + case uploadDataPathSpec: + return path.Join(append(repoPrefix, v.name, "uploads", v.uuid, "data")...), nil + case uploadStartedAtPathSpec: + return path.Join(append(repoPrefix, v.name, "uploads", v.uuid, "startedat")...), nil default: // TODO(sday): This is an internal error. Ensure it doesn't escape (panic?). return "", fmt.Errorf("unknown path spec: %#v", v) @@ -170,6 +180,29 @@ type blobPathSpec struct { func (blobPathSpec) pathSpec() {} +// uploadDataPathSpec defines the path parameters of the data file for +// uploads. +type uploadDataPathSpec struct { + name string + uuid string +} + +func (uploadDataPathSpec) pathSpec() {} + +// uploadDataPathSpec defines the path parameters for the file that stores the +// start time of an uploads. If it is missing, the upload is considered +// unknown. Admittedly, the presence of this file is an ugly hack to make sure +// we have a way to cleanup old or stalled uploads that doesn't rely on driver +// FileInfo behavior. If we come up with a more clever way to do this, we +// should remove this file immediately and rely on the startetAt field from +// the client to enforce time out policies. +type uploadStartedAtPathSpec struct { + name string + uuid string +} + +func (uploadStartedAtPathSpec) pathSpec() {} + // digestPathComoponents provides a consistent path breakdown for a given // digest. For a generic digest, it will be as follows: // diff --git a/storage/paths_test.go b/storage/paths_test.go index 7b91865f7..3a5ea899d 100644 --- a/storage/paths_test.go +++ b/storage/paths_test.go @@ -43,10 +43,18 @@ func TestPathMapper(t *testing.T) { expected: "/pathmapper-test/blob/tarsum/v1/sha256/ab/abcdefabcdefabcdef908909909", }, { - spec: blobPathSpec{ - digest: digest.Digest("tarsum+sha256:abcdefabcdefabcdef908909909"), + spec: uploadDataPathSpec{ + name: "foo/bar", + uuid: "asdf-asdf-asdf-adsf", }, - expected: "/pathmapper-test/blob/tarsum/v0/sha256/ab/abcdefabcdefabcdef908909909", + expected: "/pathmapper-test/repositories/foo/bar/uploads/asdf-asdf-asdf-adsf/data", + }, + { + spec: uploadStartedAtPathSpec{ + name: "foo/bar", + uuid: "asdf-asdf-asdf-adsf", + }, + expected: "/pathmapper-test/repositories/foo/bar/uploads/asdf-asdf-asdf-adsf/startedat", }, } { p, err := pm.path(testcase.spec) diff --git a/storage/services.go b/storage/services.go index 5507faebd..97edca3fc 100644 --- a/storage/services.go +++ b/storage/services.go @@ -9,28 +9,18 @@ import ( // Services provides various services with application-level operations for // use across backend storage drivers. type Services struct { - driver storagedriver.StorageDriver - pathMapper *pathMapper - layerUploadStore layerUploadStore + driver storagedriver.StorageDriver + pathMapper *pathMapper } // NewServices creates a new Services object to access docker objects stored // in the underlying driver. func NewServices(driver storagedriver.StorageDriver) *Services { - layerUploadStore, err := newTemporaryLocalFSLayerUploadStore() - - if err != nil { - // TODO(stevvooe): This failure needs to be understood in the context - // of the lifecycle of the services object, which is uncertain at this - // point. - panic("unable to allocate layerUploadStore: " + err.Error()) - } return &Services{ driver: driver, // TODO(sday): This should be configurable. - pathMapper: defaultPathMapper, - layerUploadStore: layerUploadStore, + pathMapper: defaultPathMapper, } } @@ -38,7 +28,7 @@ func NewServices(driver storagedriver.StorageDriver) *Services { // may be context sensitive in the future. The instance should be used similar // to a request local. func (ss *Services) Layers() LayerService { - return &layerStore{driver: ss.driver, pathMapper: ss.pathMapper, uploadStore: ss.layerUploadStore} + return &layerStore{driver: ss.driver, pathMapper: ss.pathMapper} } // Manifests returns an instance of ManifestService. Instantiation is cheap and @@ -78,7 +68,8 @@ type LayerService interface { // returning a handle. Upload(name string) (LayerUpload, error) - // Resume continues an in progress layer upload, returning the current - // state of the upload. - Resume(layerUploadState LayerUploadState) (LayerUpload, error) + // Resume continues an in progress layer upload, returning a handle to the + // upload. The caller should seek to the latest desired upload location + // before proceeding. + Resume(name, uuid string) (LayerUpload, error) } diff --git a/storagedriver/inmemory/driver.go b/storagedriver/inmemory/driver.go index e3c63f741..c2be1913b 100644 --- a/storagedriver/inmemory/driver.go +++ b/storagedriver/inmemory/driver.go @@ -133,6 +133,11 @@ func (d *Driver) WriteStream(path string, offset int64, reader io.Reader) (nn in return 0, fmt.Errorf("not a file") } + // Unlock while we are reading from the source, in case we are reading + // from the same mfs instance. This can be fixed by a more granular + // locking model. + d.mutex.Unlock() + d.mutex.RLock() // Take the readlock to block other writers. var buf bytes.Buffer nn, err = buf.ReadFrom(reader) @@ -142,9 +147,13 @@ func (d *Driver) WriteStream(path string, offset int64, reader io.Reader) (nn in // backend. What is the correct return value? Really, the caller needs // to know that the reader has been advanced and reattempting the // operation is incorrect. + d.mutex.RUnlock() + d.mutex.Lock() return nn, err } + d.mutex.RUnlock() + d.mutex.Lock() f.WriteAt(buf.Bytes(), offset) return nn, err }