package storage import ( "encoding/json" "fmt" "io/ioutil" "os" "path/filepath" "sort" "strings" "code.google.com/p/go-uuid/uuid" "github.com/docker/docker-registry/digest" "github.com/docker/docker-registry/storagedriver" "github.com/docker/docker/pkg/tarsum" "io" ) // LayerUploadState captures the state serializable state of the layer upload. type LayerUploadState struct { // name is the primary repository under which the layer will be linked. Name string // UUID identifies the upload. UUID string // offset contains the current progress of the upload. Offset int64 } // layerUploadController is used to control the various aspects of resumable // layer upload. It implements the LayerUpload interface. type layerUploadController struct { LayerUploadState layerStore *layerStore uploadStore layerUploadStore fp layerFile err error // terminal error, if set, controller is closed } // layerFile documents the interface used while writing layer files, similar // to *os.File. This is separate from layerReader, for now, because we want to // store uploads on the local file system until we have write-through hashing // support. They should be combined once this is worked out. type layerFile interface { io.WriteSeeker io.Reader io.Closer // Sync commits the contents of the writer to storage. Sync() (err error) } // layerUploadStore provides storage for temporary files and upload state of // layers. This is be used by the LayerService to manage the state of ongoing // uploads. This interface will definitely change and will most likely end up // being exported to the app layer. Move the layer.go when it's ready to go. type layerUploadStore interface { New(name string) (LayerUploadState, error) Open(uuid string) (layerFile, error) GetState(uuid string) (LayerUploadState, error) SaveState(lus LayerUploadState) error DeleteState(uuid string) error } var _ LayerUpload = &layerUploadController{} // Name of the repository under which the layer will be linked. func (luc *layerUploadController) Name() string { return luc.LayerUploadState.Name } // UUID returns the identifier for this upload. func (luc *layerUploadController) UUID() string { return luc.LayerUploadState.UUID } // Offset returns the position of the last byte written to this layer. func (luc *layerUploadController) Offset() int64 { return luc.LayerUploadState.Offset } // Finish marks the upload as completed, returning a valid handle to the // uploaded layer. The final size and checksum are validated against the // contents of the uploaded layer. The checksum should be provided in the // format :. func (luc *layerUploadController) Finish(size int64, digest digest.Digest) (Layer, error) { // This section is going to be pretty ugly now. We will have to read the // file twice. First, to get the tarsum and checksum. When those are // available, and validated, we will upload it to the blob store and link // it into the repository. In the future, we need to use resumable hash // calculations for tarsum and checksum that can be calculated during the // upload. This will allow us to cut the data directly into a temporary // directory in the storage backend. fp, err := luc.file() if err != nil { // Cleanup? return nil, err } digest, err = luc.validateLayer(fp, size, digest) if err != nil { return nil, err } if err := luc.writeLayer(fp, size, digest); err != nil { // Cleanup? return nil, err } // Yes! We have written some layer data. Let's make it visible. Link the // layer blob into the repository. if err := luc.linkLayer(digest); err != nil { return nil, err } // Ok, the upload has completed and finished. Delete the state. if err := luc.uploadStore.DeleteState(luc.UUID()); err != nil { // Can we ignore this error? return nil, err } return luc.layerStore.Fetch(luc.Name(), digest) } // Cancel the layer upload process. func (luc *layerUploadController) Cancel() error { if err := luc.layerStore.uploadStore.DeleteState(luc.UUID()); err != nil { return err } return luc.Close() } func (luc *layerUploadController) Write(p []byte) (int, error) { wr, err := luc.file() if err != nil { return 0, err } n, err := wr.Write(p) // Because we expect the reported offset to be consistent with the storage // state, unfortunately, we need to Sync on every call to write. if err := wr.Sync(); err != nil { // Effectively, ignore the write state if the Sync fails. Report that // no bytes were written and seek back to the starting offset. offset, seekErr := wr.Seek(luc.Offset(), os.SEEK_SET) if seekErr != nil { // What do we do here? Quite disasterous. luc.reset() return 0, fmt.Errorf("multiple errors encounterd after Sync + Seek: %v then %v", err, seekErr) } if offset != luc.Offset() { return 0, fmt.Errorf("unexpected offset after seek") } return 0, err } luc.LayerUploadState.Offset += int64(n) if err := luc.uploadStore.SaveState(luc.LayerUploadState); err != nil { // TODO(stevvooe): This failure case may require more thought. return n, err } return n, err } func (luc *layerUploadController) Close() error { if luc.err != nil { return luc.err } if luc.fp != nil { luc.err = luc.fp.Close() } return luc.err } func (luc *layerUploadController) file() (layerFile, error) { if luc.fp != nil { return luc.fp, nil } fp, err := luc.uploadStore.Open(luc.UUID()) if err != nil { return nil, err } // TODO(stevvooe): We may need a more aggressive check here to ensure that // the file length is equal to the current offset. We may want to sync the // offset before return the layer upload to the client so it can be // validated before proceeding with any writes. // Seek to the current layer offset for good measure. if _, err = fp.Seek(luc.Offset(), os.SEEK_SET); err != nil { return nil, err } luc.fp = fp return luc.fp, nil } // reset closes and drops the current writer. func (luc *layerUploadController) reset() { if luc.fp != nil { luc.fp.Close() luc.fp = nil } } // validateLayer runs several checks on the layer file to ensure its validity. // This is currently very expensive and relies on fast io and fast seek on the // local host. If successful, the latest digest is returned, which should be // used over the passed in value. func (luc *layerUploadController) validateLayer(fp layerFile, size int64, dgst digest.Digest) (digest.Digest, error) { // First, check the incoming tarsum version of the digest. version, err := tarsum.GetVersionFromTarsum(dgst.String()) if err != nil { return "", err } // TODO(stevvooe): Should we push this down into the digest type? switch version { case tarsum.Version1: default: // version 0 and dev, for now. return "", ErrLayerTarSumVersionUnsupported } digestVerifier := digest.NewDigestVerifier(dgst) lengthVerifier := digest.NewLengthVerifier(size) // First, seek to the end of the file, checking the size is as expected. end, err := fp.Seek(0, os.SEEK_END) if err != nil { return "", err } if end != size { // Fast path length check. return "", ErrLayerInvalidLength } // Now seek back to start and take care of the digest. if _, err := fp.Seek(0, os.SEEK_SET); err != nil { return "", err } tr := io.TeeReader(fp, lengthVerifier) tr = io.TeeReader(tr, digestVerifier) // TODO(stevvooe): This is one of the places we need a Digester write // sink. Instead, its read driven. This migth be okay. // Calculate an updated digest with the latest version. dgst, err = digest.FromReader(tr) if err != nil { return "", err } if !lengthVerifier.Verified() { return "", ErrLayerInvalidLength } if !digestVerifier.Verified() { return "", ErrLayerInvalidDigest } return dgst, nil } // writeLayer actually writes the the layer file into its final destination. // The layer should be validated before commencing the write. func (luc *layerUploadController) writeLayer(fp layerFile, size int64, digest digest.Digest) error { blobPath, err := luc.layerStore.pathMapper.path(blobPathSpec{ alg: digest.Algorithm(), digest: digest.Hex(), }) if err != nil { return err } // Check for existence if _, err := luc.layerStore.driver.CurrentSize(blobPath); err != nil { // TODO(stevvooe): This check is kind of problematic and very racy. switch err := err.(type) { case storagedriver.PathNotFoundError: break // ensure that it doesn't exist. default: // TODO(stevvooe): This isn't actually an error: the blob store is // content addressable and we should just use this to ensure we // have it written. Although, we do need to verify that the // content that is there is the correct length. return err } } // Seek our local layer file back now. if _, err := fp.Seek(0, os.SEEK_SET); err != nil { // Cleanup? return err } // Okay: we can write the file to the blob store. if err := luc.layerStore.driver.WriteStream(blobPath, 0, uint64(size), fp); err != nil { return err } return nil } // linkLayer links a valid, written layer blog into the registry, first // linking the repository namespace, then adding it to the layerindex. func (luc *layerUploadController) linkLayer(digest digest.Digest) error { layerLinkPath, err := luc.layerStore.pathMapper.path(layerLinkPathSpec{ name: luc.Name(), digest: digest, }) if err != nil { return err } if err := luc.layerStore.driver.PutContent(layerLinkPath, []byte(digest)); err != nil { return nil } // Link the layer into the name index. layerIndexLinkPath, err := luc.layerStore.pathMapper.path(layerIndexLinkPathSpec{ digest: digest, }) if err != nil { return err } // Read back the name index file. If it exists, create it. If not, add the // new repo to the name list. // TODO(stevvooe): This is very racy, as well. Reconsider using list for // this operation? layerIndexLinkContent, err := luc.layerStore.driver.GetContent(layerIndexLinkPath) if err != nil { switch err := err.(type) { case storagedriver.PathNotFoundError: layerIndexLinkContent = []byte(luc.Name()) default: return err } } layerIndexLinkContent = luc.maybeAddNameToLayerIndexLinkContent(layerIndexLinkContent) // Write the index content back to the index. return luc.layerStore.driver.PutContent(layerIndexLinkPath, layerIndexLinkContent) } func (luc *layerUploadController) maybeAddNameToLayerIndexLinkContent(content []byte) []byte { names := strings.Split(string(content), "\n") var found bool // Search the names and find ours for _, name := range names { if name == luc.Name() { found = true } } if !found { names = append(names, luc.Name()) } sort.Strings(names) return []byte(strings.Join(names, "\n")) } // localFSLayerUploadStore implements a local layerUploadStore. There are some // complexities around hashsums that make round tripping to the storage // backend problematic, so we'll store and read locally for now. By GO-beta, // this should be fully implemented on top of the backend storagedriver. // // For now, the directory layout is as follows: // // //registry-layer-upload/ // / // -> state.json // -> data // // Each upload, identified by uuid, has its own directory with a state file // and a data file. The state file has a json representation of the current // state. The data file is the in-progress upload data. type localFSLayerUploadStore struct { root string } func newTemporaryLocalFSLayerUploadStore() (layerUploadStore, error) { path, err := ioutil.TempDir("", "registry-layer-upload") if err != nil { return nil, err } return &localFSLayerUploadStore{ root: path, }, nil } func (llufs *localFSLayerUploadStore) New(name string) (LayerUploadState, error) { lus := LayerUploadState{ Name: name, UUID: uuid.New(), } if err := os.Mkdir(llufs.path(lus.UUID, ""), 0755); err != nil { return lus, err } return lus, nil } func (llufs *localFSLayerUploadStore) Open(uuid string) (layerFile, error) { fp, err := os.OpenFile(llufs.path(uuid, "data"), os.O_CREATE|os.O_APPEND|os.O_RDWR, 0644) if err != nil { return nil, err } return fp, nil } func (llufs *localFSLayerUploadStore) GetState(uuid string) (LayerUploadState, error) { // TODO(stevvoe): Storing this state on the local file system is an // intermediate stop gap. This technique is unlikely to handle any kind of // concurrency very well. var lus LayerUploadState fp, err := os.Open(llufs.path(uuid, "state.json")) if err != nil { if os.IsNotExist(err) { return lus, ErrLayerUploadUnknown } return lus, err } defer fp.Close() dec := json.NewDecoder(fp) if err := dec.Decode(&lus); err != nil { return lus, err } return lus, nil } func (llufs *localFSLayerUploadStore) SaveState(lus LayerUploadState) error { p, err := json.Marshal(lus) if err != nil { return err } err = ioutil.WriteFile(llufs.path(lus.UUID, "state.json"), p, 0644) if os.IsNotExist(err) { return ErrLayerUploadUnknown } return err } func (llufs *localFSLayerUploadStore) DeleteState(uuid string) error { if err := os.RemoveAll(llufs.path(uuid, "")); err != nil { if os.IsNotExist(err) { return ErrLayerUploadUnknown } return err } return nil } func (llufs *localFSLayerUploadStore) path(uuid, file string) string { return filepath.Join(llufs.root, uuid, file) }