diff --git a/storage/digest.go b/storage/digest.go new file mode 100644 index 000000000..db5c884be --- /dev/null +++ b/storage/digest.go @@ -0,0 +1,59 @@ +package storage + +import ( + "fmt" + "hash" + "strings" +) + +// Digest allows simple protection of hex formatted digest strings, prefixed +// by their algorithm. Strings of type Digest have some guarantee of being in +// the correct format and it provides quick access to the components of a +// digest string. +// +// The following is an example of the contents of Digest types: +// +// sha256:7173b809ca12ec5dee4506cd86be934c4596dd234ee82c0662eac04a8c2c71dc +// +type Digest string + +// NewDigest returns a Digest from alg and a hash.Hash object. +func NewDigest(alg string, h hash.Hash) Digest { + return Digest(fmt.Sprintf("%s:%x", alg, h.Sum(nil))) +} + +var ( + // ErrDigestInvalidFormat returned when digest format invalid. + ErrDigestInvalidFormat = fmt.Errorf("invalid checksum digest format") + + // ErrDigestUnsupported returned when the digest algorithm is unsupported by registry. + ErrDigestUnsupported = fmt.Errorf("unsupported digest algorithm") +) + +// ParseDigest parses s and returns the validated digest object. An error will +// be returned if the format is invalid. +func ParseDigest(s string) (Digest, error) { + parts := strings.SplitN(s, ":", 2) + if len(parts) != 2 { + return "", ErrDigestInvalidFormat + } + + switch parts[0] { + case "sha256": + break + default: + return "", ErrDigestUnsupported + } + + return Digest(s), nil +} + +// Algorithm returns the algorithm portion of the digest. +func (d Digest) Algorithm() string { + return strings.SplitN(string(d), ":", 2)[0] +} + +// Hex returns the hex digest portion of the digest. +func (d Digest) Hex() string { + return strings.SplitN(string(d), ":", 2)[1] +} diff --git a/storage/layer.go b/storage/layer.go new file mode 100644 index 000000000..bae697015 --- /dev/null +++ b/storage/layer.go @@ -0,0 +1,96 @@ +package storage + +import ( + "fmt" + "io" + "time" +) + +// LayerService provides operations on layer files in a backend storage. +type LayerService interface { + // Exists returns true if the layer exists. + Exists(tarSum string) (bool, error) + + // Fetch the layer identifed by TarSum. + Fetch(tarSum string) (Layer, error) + + // Upload begins a layer upload, returning a handle. If the layer upload + // is already in progress or the layer has already been uploaded, this + // will return an error. + Upload(name, tarSum string) (LayerUpload, error) + + // Resume continues an in progress layer upload, returning the current + // state of the upload. + Resume(name, tarSum, uuid string) (LayerUpload, error) +} + +// Layer provides a readable and seekable layer object. Typically, +// implementations are *not* goroutine safe. +type Layer interface { + // http.ServeContent requires an efficient implementation of + // ReadSeeker.Seek(0, os.SEEK_END). + io.ReadSeeker + io.Closer + + // Name returns the repository under which this layer is linked. + Name() string // TODO(stevvooe): struggling with nomenclature: should this be "repo" or "name"? + + // TarSum returns the unique tarsum of the layer. + TarSum() string + + // CreatedAt returns the time this layer was created. Until we implement + // Stat call on storagedriver, this just returns the zero time. + CreatedAt() time.Time +} + +// LayerUpload provides a handle for working with in-progress uploads. +// Instances can be obtained from the LayerService.Upload and +// LayerService.Resume. +type LayerUpload interface { + io.WriteCloser + + // UUID returns the identifier for this upload. + UUID() string + + // Name of the repository under which the layer will be linked. + Name() string + + // TarSum identifier of the proposed layer. Resulting data must match this + // tarsum. + TarSum() string + + // Offset returns the position of the last byte written to this layer. + Offset() int64 + + // Finish marks the upload as completed, returning a valid handle to the + // uploaded layer. The final size and checksum are validated against the + // contents of the uploaded layer. The checksum should be provided in the + // format :. + Finish(size int64, digest string) (Layer, error) + + // Cancel the layer upload process. + Cancel() error +} + +var ( + // ErrLayerUnknown returned when layer cannot be found. + ErrLayerUnknown = fmt.Errorf("unknown layer") + + // ErrLayerExists returned when layer already exists + ErrLayerExists = fmt.Errorf("layer exists") + + // ErrLayerTarSumVersionUnsupported when tarsum is unsupported version. + ErrLayerTarSumVersionUnsupported = fmt.Errorf("unsupported tarsum version") + + // ErrLayerUploadUnknown returned when upload is not found. + ErrLayerUploadUnknown = fmt.Errorf("layer upload unknown") + + // ErrLayerInvalidChecksum returned when checksum/digest check fails. + ErrLayerInvalidChecksum = fmt.Errorf("invalid layer checksum") + + // ErrLayerInvalidTarsum returned when tarsum check fails. + ErrLayerInvalidTarsum = fmt.Errorf("invalid layer tarsum") + + // ErrLayerInvalidLength returned when length check fails. + ErrLayerInvalidLength = fmt.Errorf("invalid layer length") +) diff --git a/storage/layer_test.go b/storage/layer_test.go new file mode 100644 index 000000000..721878102 --- /dev/null +++ b/storage/layer_test.go @@ -0,0 +1,450 @@ +package storage + +import ( + "archive/tar" + "bytes" + "crypto/rand" + "crypto/sha256" + "fmt" + "io" + "io/ioutil" + mrand "math/rand" + "os" + "testing" + "time" + + "github.com/docker/docker/pkg/tarsum" + + "github.com/docker/docker-registry/storagedriver" + "github.com/docker/docker-registry/storagedriver/inmemory" +) + +// TestSimpleLayerUpload covers the layer upload process, exercising common +// error paths that might be seen during an upload. +func TestSimpleLayerUpload(t *testing.T) { + randomDataReader, tarSum, err := createRandomReader() + + if err != nil { + t.Fatalf("error creating random reader: %v", err) + } + + uploadStore, err := newTemporaryLocalFSLayerUploadStore() + if err != nil { + t.Fatalf("error allocating upload store: %v", err) + } + + imageName := "foo/bar" + driver := inmemory.New() + + ls := &layerStore{ + driver: driver, + pathMapper: &pathMapper{ + root: "/storage/testing", + version: storagePathVersion, + }, + uploadStore: uploadStore, + } + + h := sha256.New() + rd := io.TeeReader(randomDataReader, h) + + layerUpload, err := ls.Upload(imageName, tarSum) + + if err != nil { + t.Fatalf("unexpected error starting layer upload: %s", err) + } + + // Cancel the upload then restart it + if err := layerUpload.Cancel(); err != nil { + t.Fatalf("unexpected error during upload cancellation: %v", err) + } + + // Do a resume, get unknown upload + layerUpload, err = ls.Resume(imageName, tarSum, layerUpload.UUID()) + if err != ErrLayerUploadUnknown { + t.Fatalf("unexpected error resuming upload, should be unkown: %v", err) + } + + // Restart! + layerUpload, err = ls.Upload(imageName, tarSum) + if err != nil { + t.Fatalf("unexpected error starting layer upload: %s", err) + } + + // Get the size of our random tarfile + randomDataSize, err := seekerSize(randomDataReader) + if err != nil { + t.Fatalf("error getting seeker size of random data: %v", err) + } + + nn, err := io.Copy(layerUpload, rd) + if err != nil { + t.Fatalf("unexpected error uploading layer data: %v", err) + } + + if nn != randomDataSize { + t.Fatalf("layer data write incomplete") + } + + if layerUpload.Offset() != nn { + t.Fatalf("layerUpload not updated with correct offset: %v != %v", layerUpload.Offset(), nn) + } + layerUpload.Close() + + // Do a resume, for good fun + layerUpload, err = ls.Resume(imageName, tarSum, layerUpload.UUID()) + if err != nil { + t.Fatalf("unexpected error resuming upload: %v", err) + } + + digest := NewDigest("sha256", h) + layer, err := layerUpload.Finish(randomDataSize, string(digest)) + + if err != nil { + t.Fatalf("unexpected error finishing layer upload: %v", err) + } + + // After finishing an upload, it should no longer exist. + if _, err := ls.Resume(imageName, tarSum, layerUpload.UUID()); err != ErrLayerUploadUnknown { + t.Fatalf("expected layer upload to be unknown, got %v", err) + } + + // Test for existence. + exists, err := ls.Exists(layer.TarSum()) + if err != nil { + t.Fatalf("unexpected error checking for existence: %v", err) + } + + if !exists { + t.Fatalf("layer should now exist") + } + + h.Reset() + nn, err = io.Copy(h, layer) + if err != nil { + t.Fatalf("error reading layer: %v", err) + } + + if nn != randomDataSize { + t.Fatalf("incorrect read length") + } + + if NewDigest("sha256", h) != digest { + t.Fatalf("unexpected digest from uploaded layer: %q != %q", NewDigest("sha256", h), digest) + } +} + +// TestSimpleLayerRead just creates a simple layer file and ensures that basic +// open, read, seek, read works. More specific edge cases should be covered in +// other tests. +func TestSimpleLayerRead(t *testing.T) { + imageName := "foo/bar" + driver := inmemory.New() + ls := &layerStore{ + driver: driver, + pathMapper: &pathMapper{ + root: "/storage/testing", + version: storagePathVersion, + }, + } + + randomLayerReader, tarSum, err := createRandomReader() + if err != nil { + t.Fatalf("error creating random data: %v", err) + } + + // Test for existence. + exists, err := ls.Exists(tarSum) + if err != nil { + t.Fatalf("unexpected error checking for existence: %v", err) + } + + if exists { + t.Fatalf("layer should not exist") + } + + // Try to get the layer and make sure we get a not found error + layer, err := ls.Fetch(tarSum) + if err == nil { + t.Fatalf("error expected fetching unknown layer") + } + + if err != ErrLayerUnknown { + t.Fatalf("unexpected error fetching non-existent layer: %v", err) + } else { + err = nil + } + + randomLayerDigest, err := writeTestLayer(driver, ls.pathMapper, imageName, tarSum, randomLayerReader) + if err != nil { + t.Fatalf("unexpected error writing test layer: %v", err) + } + + randomLayerSize, err := seekerSize(randomLayerReader) + if err != nil { + t.Fatalf("error getting seeker size for random layer: %v", err) + } + + layer, err = ls.Fetch(tarSum) + if err != nil { + t.Fatal(err) + } + defer layer.Close() + + // Now check the sha digest and ensure its the same + h := sha256.New() + nn, err := io.Copy(h, layer) + if err != nil && err != io.EOF { + t.Fatalf("unexpected error copying to hash: %v", err) + } + + if nn != randomLayerSize { + t.Fatalf("stored incorrect number of bytes in layer: %d != %d", nn, randomLayerSize) + } + + digest := NewDigest("sha256", h) + if digest != randomLayerDigest { + t.Fatalf("fetched digest does not match: %q != %q", digest, randomLayerDigest) + } + + // Now seek back the layer, read the whole thing and check against randomLayerData + offset, err := layer.Seek(0, os.SEEK_SET) + if err != nil { + t.Fatalf("error seeking layer: %v", err) + } + + if offset != 0 { + t.Fatalf("seek failed: expected 0 offset, got %d", offset) + } + + p, err := ioutil.ReadAll(layer) + if err != nil { + t.Fatalf("error reading all of layer: %v", err) + } + + if len(p) != int(randomLayerSize) { + t.Fatalf("layer data read has different length: %v != %v", len(p), randomLayerSize) + } + + // Reset the randomLayerReader and read back the buffer + _, err = randomLayerReader.Seek(0, os.SEEK_SET) + if err != nil { + t.Fatalf("error resetting layer reader: %v", err) + } + + randomLayerData, err := ioutil.ReadAll(randomLayerReader) + if err != nil { + t.Fatalf("random layer read failed: %v", err) + } + + if !bytes.Equal(p, randomLayerData) { + t.Fatalf("layer data not equal") + } +} + +func TestLayerReaderSeek(t *testing.T) { + // TODO(stevvooe): Ensure that all relative seeks work as advertised. + // Readers must close and re-open on command. This is important to support + // resumable and concurrent downloads via HTTP range requests. +} + +// TestLayerReadErrors covers the various error return type for different +// conditions that can arise when reading a layer. +func TestLayerReadErrors(t *testing.T) { + // TODO(stevvooe): We need to cover error return types, driven by the + // errors returned via the HTTP API. For now, here is a incomplete list: + // + // 1. Layer Not Found: returned when layer is not found or access is + // denied. + // 2. Layer Unavailable: returned when link references are unresolved, + // but layer is known to the registry. + // 3. Layer Invalid: This may more split into more errors, but should be + // returned when name or tarsum does not reference a valid error. We + // may also need something to communication layer verification errors + // for the inline tarsum check. + // 4. Timeout: timeouts to backend. Need to better understand these + // failure cases and how the storage driver propagates these errors + // up the stack. +} + +// writeRandomLayer creates a random layer under name and tarSum using driver +// and pathMapper. An io.ReadSeeker with the data is returned, along with the +// sha256 hex digest. +func writeRandomLayer(driver storagedriver.StorageDriver, pathMapper *pathMapper, name string) (rs io.ReadSeeker, tarSum string, digest Digest, err error) { + reader, tarSum, err := createRandomReader() + if err != nil { + return nil, "", "", err + } + + // Now, actually create the layer. + randomLayerDigest, err := writeTestLayer(driver, pathMapper, name, tarSum, ioutil.NopCloser(reader)) + + if _, err := reader.Seek(0, os.SEEK_SET); err != nil { + return nil, "", "", err + } + + return reader, tarSum, randomLayerDigest, err +} + +// seekerSize seeks to the end of seeker, checks the size and returns it to +// the original state, returning the size. The state of the seeker should be +// treated as unknown if an error is returned. +func seekerSize(seeker io.ReadSeeker) (int64, error) { + current, err := seeker.Seek(0, os.SEEK_CUR) + if err != nil { + return 0, err + } + + end, err := seeker.Seek(0, os.SEEK_END) + if err != nil { + return 0, err + } + + resumed, err := seeker.Seek(current, os.SEEK_SET) + if err != nil { + return 0, err + } + + if resumed != current { + return 0, fmt.Errorf("error returning seeker to original state, could not seek back to original location") + } + + return end, nil +} + +// createRandomReader returns a random read seeker and its tarsum. The +// returned content will be a valid tar file with a random number of files and +// content. +func createRandomReader() (rs io.ReadSeeker, tarSum string, err error) { + nFiles := mrand.Intn(10) + 10 + target := &bytes.Buffer{} + wr := tar.NewWriter(target) + + // Perturb this on each iteration of the loop below. + header := &tar.Header{ + Mode: 0644, + ModTime: time.Now(), + Typeflag: tar.TypeReg, + Uname: "randocalrissian", + Gname: "cloudcity", + AccessTime: time.Now(), + ChangeTime: time.Now(), + } + + for fileNumber := 0; fileNumber < nFiles; fileNumber++ { + fileSize := mrand.Int63n(1<<20) + 1<<20 + + header.Name = fmt.Sprint(fileNumber) + header.Size = fileSize + + if err := wr.WriteHeader(header); err != nil { + return nil, "", err + } + + randomData := make([]byte, fileSize) + + // Fill up the buffer with some random data. + n, err := rand.Read(randomData) + + if n != len(randomData) { + return nil, "", fmt.Errorf("short read creating random reader: %v bytes != %v bytes", n, len(randomData)) + } + + if err != nil { + return nil, "", err + } + + nn, err := io.Copy(wr, bytes.NewReader(randomData)) + if nn != fileSize { + return nil, "", fmt.Errorf("short copy writing random file to tar") + } + + if err != nil { + return nil, "", err + } + + if err := wr.Flush(); err != nil { + return nil, "", err + } + } + + if err := wr.Close(); err != nil { + return nil, "", err + } + + reader := bytes.NewReader(target.Bytes()) + + // A tar builder that supports tarsum inline calculation would be awesome + // here. + ts, err := tarsum.NewTarSum(reader, true, tarsum.Version1) + if err != nil { + return nil, "", err + } + + nn, err := io.Copy(ioutil.Discard, ts) + if nn != int64(len(target.Bytes())) { + return nil, "", fmt.Errorf("short copy when getting tarsum of random layer: %v != %v", nn, len(target.Bytes())) + } + + if err != nil { + return nil, "", err + } + + return bytes.NewReader(target.Bytes()), ts.Sum(nil), nil +} + +// createTestLayer creates a simple test layer in the provided driver under +// tarsum, returning the string digest. This is implemented peicemeal and +// should probably be replaced by the uploader when it's ready. +func writeTestLayer(driver storagedriver.StorageDriver, pathMapper *pathMapper, name, tarSum string, content io.Reader) (Digest, error) { + h := sha256.New() + rd := io.TeeReader(content, h) + + p, err := ioutil.ReadAll(rd) + + if err != nil { + return "", nil + } + + digest := NewDigest("sha256", h) + + blobPath, err := pathMapper.path(blobPathSpec{ + alg: digest.Algorithm(), + digest: digest.Hex(), + }) + + if err := driver.PutContent(blobPath, p); err != nil { + return "", err + } + + layerIndexLinkPath, err := pathMapper.path(layerIndexLinkPathSpec{ + tarSum: tarSum, + }) + + if err != nil { + return "", err + } + + layerLinkPath, err := pathMapper.path(layerLinkPathSpec{ + name: name, + tarSum: tarSum, + }) + + if err != nil { + return "", err + } + + if err != nil { + return "", err + } + + if err := driver.PutContent(layerLinkPath, []byte(string(NewDigest("sha256", h)))); err != nil { + return "", nil + } + + if err = driver.PutContent(layerIndexLinkPath, []byte(name)); err != nil { + return "", nil + } + + return NewDigest("sha256", h), err +} diff --git a/storage/layerreader.go b/storage/layerreader.go new file mode 100644 index 000000000..df05c3675 --- /dev/null +++ b/storage/layerreader.go @@ -0,0 +1,172 @@ +package storage + +import ( + "bufio" + "fmt" + "io" + "os" + "time" +) + +// layerReadSeeker implements Layer and provides facilities for reading and +// seeking. +type layerReader struct { + layerStore *layerStore + rc io.ReadCloser + brd *bufio.Reader + + name string // repo name of this layer + tarSum string + path string + createdAt time.Time + + // offset is the current read offset + offset int64 + + // size is the total layer size, if available. + size int64 + + closedErr error // terminal error, if set, reader is closed +} + +var _ Layer = &layerReader{} + +func (lrs *layerReader) Name() string { + return lrs.name +} + +func (lrs *layerReader) TarSum() string { + return lrs.tarSum +} + +func (lrs *layerReader) CreatedAt() time.Time { + return lrs.createdAt +} + +func (lrs *layerReader) Read(p []byte) (n int, err error) { + if err := lrs.closed(); err != nil { + return 0, err + } + + rd, err := lrs.reader() + if err != nil { + return 0, err + } + + n, err = rd.Read(p) + lrs.offset += int64(n) + + // Simulate io.EOR error if we reach filesize. + if err == nil && lrs.offset >= lrs.size { + err = io.EOF + } + + // TODO(stevvooe): More error checking is required here. If the reader + // times out for some reason, we should reset the reader so we re-open the + // connection. + + return n, err +} + +func (lrs *layerReader) Seek(offset int64, whence int) (int64, error) { + if err := lrs.closed(); err != nil { + return 0, err + } + + var err error + newOffset := lrs.offset + + switch whence { + case os.SEEK_CUR: + newOffset += int64(whence) + case os.SEEK_END: + newOffset = lrs.size + int64(whence) + case os.SEEK_SET: + newOffset = int64(whence) + } + + if newOffset < 0 { + err = fmt.Errorf("cannot seek to negative position") + } else if newOffset >= lrs.size { + err = fmt.Errorf("cannot seek passed end of layer") + } else { + if lrs.offset != newOffset { + lrs.resetReader() + } + + // No problems, set the offset. + lrs.offset = newOffset + } + + return lrs.offset, err +} + +// Close the layer. Should be called when the resource is no longer needed. +func (lrs *layerReader) Close() error { + if lrs.closedErr != nil { + return lrs.closedErr + } + // TODO(sday): Must export this error. + lrs.closedErr = fmt.Errorf("layer closed") + + // close and release reader chain + if lrs.rc != nil { + lrs.rc.Close() + lrs.rc = nil + } + lrs.brd = nil + + return lrs.closedErr +} + +// reader prepares the current reader at the lrs offset, ensuring its buffered +// and ready to go. +func (lrs *layerReader) reader() (io.Reader, error) { + if err := lrs.closed(); err != nil { + return nil, err + } + + if lrs.rc != nil { + return lrs.brd, nil + } + + // If we don't have a reader, open one up. + rc, err := lrs.layerStore.driver.ReadStream(lrs.path, uint64(lrs.offset)) + + if err != nil { + return nil, err + } + + lrs.rc = rc + + if lrs.brd == nil { + // TODO(stevvooe): Set an optimal buffer size here. We'll have to + // understand the latency characteristics of the underlying network to + // set this correctly, so we may want to leave it to the driver. For + // out of process drivers, we'll have to optimize this buffer size for + // local communication. + lrs.brd = bufio.NewReader(lrs.rc) + } else { + lrs.brd.Reset(lrs.rc) + } + + return lrs.brd, nil +} + +// resetReader resets the reader, forcing the read method to open up a new +// connection and rebuild the buffered reader. This should be called when the +// offset and the reader will become out of sync, such as during a seek +// operation. +func (lrs *layerReader) resetReader() { + if err := lrs.closed(); err != nil { + return + } + if lrs.rc != nil { + lrs.rc.Close() + lrs.rc = nil + } +} + +func (lrs *layerReader) closed() error { + return lrs.closedErr +} diff --git a/storage/layerstore.go b/storage/layerstore.go new file mode 100644 index 000000000..e2821a839 --- /dev/null +++ b/storage/layerstore.go @@ -0,0 +1,203 @@ +package storage + +import ( + "fmt" + "strings" + "time" + + "github.com/Sirupsen/logrus" + "github.com/docker/docker-registry/storagedriver" +) + +type layerStore struct { + driver storagedriver.StorageDriver + pathMapper *pathMapper + uploadStore layerUploadStore +} + +func (ls *layerStore) Exists(tarSum string) (bool, error) { + // Because this implementation just follows blob links, an existence check + // is pretty cheap by starting and closing a fetch. + _, err := ls.Fetch(tarSum) + + if err != nil { + if err == ErrLayerUnknown { + return false, nil + } + + return false, err + } + + return true, nil +} + +func (ls *layerStore) Fetch(tarSum string) (Layer, error) { + repos, err := ls.resolveContainingRepositories(tarSum) + + if err != nil { + // TODO(stevvooe): Unknown tarsum error: need to wrap. + return nil, err + } + + // TODO(stevvooe): Access control for layer pulls need to happen here: we + // have a list of repos that "own" the tarsum that need to be checked + // against the list of repos to which we have pull access. The argument + // repos needs to be filtered against that access list. + + name, blobPath, err := ls.resolveBlobPath(repos, tarSum) + + if err != nil { + // TODO(stevvooe): Map this error correctly, perhaps in the callee. + return nil, err + } + + p, err := ls.pathMapper.path(blobPath) + if err != nil { + return nil, err + } + + // Grab the size of the layer file, ensuring that it exists, among other + // things. + size, err := ls.driver.CurrentSize(p) + + if err != nil { + // TODO(stevvooe): Handle blob/path does not exist here. + // TODO(stevvooe): Get a better understanding of the error cases here + // that don't stem from unknown path. + return nil, err + } + + // Build the layer reader and return to the client. + layer := &layerReader{ + layerStore: ls, + path: p, + name: name, + tarSum: tarSum, + + // TODO(stevvooe): Storage backend does not support modification time + // queries yet. Layers "never" change, so just return the zero value. + createdAt: time.Time{}, + + offset: 0, + size: int64(size), + } + + return layer, nil +} + +// Upload begins a layer upload, returning a handle. If the layer upload +// is already in progress or the layer has already been uploaded, this +// will return an error. +func (ls *layerStore) Upload(name, tarSum string) (LayerUpload, error) { + exists, err := ls.Exists(tarSum) + if err != nil { + return nil, err + } + + if exists { + // TODO(stevvoe): This looks simple now, but we really should only + // return the layer exists error when the layer exists AND the current + // client has access to the layer. If the client doesn't have access + // to the layer, the upload should proceed. + return nil, ErrLayerExists + } + + // NOTE(stevvooe): Consider the issues with allowing concurrent upload of + // the same two layers. Should it be disallowed? For now, we allow both + // parties to proceed and the the first one uploads the layer. + + lus, err := ls.uploadStore.New(name, tarSum) + if err != nil { + return nil, err + } + + return ls.newLayerUpload(lus), nil +} + +// Resume continues an in progress layer upload, returning the current +// state of the upload. +func (ls *layerStore) Resume(name, tarSum, uuid string) (LayerUpload, error) { + lus, err := ls.uploadStore.GetState(uuid) + + if err != nil { + return nil, err + } + + return ls.newLayerUpload(lus), nil +} + +// newLayerUpload allocates a new upload controller with the given state. +func (ls *layerStore) newLayerUpload(lus LayerUploadState) LayerUpload { + return &layerUploadController{ + LayerUploadState: lus, + layerStore: ls, + uploadStore: ls.uploadStore, + } +} + +func (ls *layerStore) resolveContainingRepositories(tarSum string) ([]string, error) { + // Lookup the layer link in the index by tarsum id. + layerIndexLinkPath, err := ls.pathMapper.path(layerIndexLinkPathSpec{tarSum: tarSum}) + if err != nil { + return nil, err + } + + layerIndexLinkContent, err := ls.driver.GetContent(layerIndexLinkPath) + if err != nil { + switch err := err.(type) { + case storagedriver.PathNotFoundError: + return nil, ErrLayerUnknown + default: + return nil, err + } + } + + results := strings.Split(string(layerIndexLinkContent), "\n") + + // clean these up + for i, result := range results { + results[i] = strings.TrimSpace(result) + } + + return results, nil +} + +// resolveBlobId lookups up the tarSum in the various repos to find the blob +// link, returning the repo name and blob path spec or an error on failure. +func (ls *layerStore) resolveBlobPath(repos []string, tarSum string) (name string, bps blobPathSpec, err error) { + + for _, repo := range repos { + pathSpec := layerLinkPathSpec{name: repo, tarSum: tarSum} + layerLinkPath, err := ls.pathMapper.path(pathSpec) + + if err != nil { + // TODO(stevvooe): This looks very lazy, may want to collect these + // errors and report them if we exit this for loop without + // resolving the blob id. + logrus.Debugf("error building linkLayerPath (%V): %v", pathSpec, err) + continue + } + + layerLinkContent, err := ls.driver.GetContent(layerLinkPath) + if err != nil { + logrus.Debugf("error getting layerLink content (%V): %v", pathSpec, err) + continue + } + + // Yay! We've resolved our blob id and we're ready to go. + parts := strings.SplitN(strings.TrimSpace(string(layerLinkContent)), ":", 2) + + if len(parts) != 2 { + return "", bps, fmt.Errorf("invalid blob reference: %q", string(layerLinkContent)) + } + + name = repo + bp := blobPathSpec{alg: parts[0], digest: parts[1]} + + return repo, bp, nil + } + + // TODO(stevvooe): Map this error to repo not found, but it basically + // means we exited the loop above without finding a blob link. + return "", bps, fmt.Errorf("unable to resolve blog id for repos=%v and tarSum=%q", repos, tarSum) +} diff --git a/storage/layerupload.go b/storage/layerupload.go new file mode 100644 index 000000000..7ad32d753 --- /dev/null +++ b/storage/layerupload.go @@ -0,0 +1,514 @@ +package storage + +import ( + "crypto/sha256" + "encoding/json" + "fmt" + "io/ioutil" + "os" + "path/filepath" + "sort" + "strings" + + "code.google.com/p/go-uuid/uuid" + + "github.com/docker/docker-registry/storagedriver" + "github.com/docker/docker/pkg/tarsum" + + "io" +) + +// LayerUploadState captures the state serializable state of the layer upload. +type LayerUploadState struct { + // name is the primary repository under which the layer will be linked. + Name string + + // tarSum identifies the target layer. Provided by the client. If the + // resulting tarSum does not match this value, an error should be + // returned. + TarSum string + + // UUID identifies the upload. + UUID string + + // offset contains the current progress of the upload. + Offset int64 +} + +// layerUploadController is used to control the various aspects of resumable +// layer upload. It implements the LayerUpload interface. +type layerUploadController struct { + LayerUploadState + + layerStore *layerStore + uploadStore layerUploadStore + fp layerFile + err error // terminal error, if set, controller is closed +} + +// layerFile documents the interface used while writing layer files, similar +// to *os.File. This is separate from layerReader, for now, because we want to +// store uploads on the local file system until we have write-through hashing +// support. They should be combined once this is worked out. +type layerFile interface { + io.WriteSeeker + io.Reader + io.Closer + + // Sync commits the contents of the writer to storage. + Sync() (err error) +} + +// layerUploadStore provides storage for temporary files and upload state of +// layers. This is be used by the LayerService to manage the state of ongoing +// uploads. This interface will definitely change and will most likely end up +// being exported to the app layer. Move the layer.go when it's ready to go. +type layerUploadStore interface { + New(name, tarSum string) (LayerUploadState, error) + Open(uuid string) (layerFile, error) + GetState(uuid string) (LayerUploadState, error) + SaveState(lus LayerUploadState) error + DeleteState(uuid string) error +} + +var _ LayerUpload = &layerUploadController{} + +// Name of the repository under which the layer will be linked. +func (luc *layerUploadController) Name() string { + return luc.LayerUploadState.Name +} + +// TarSum identifier of the proposed layer. Resulting data must match this +// tarsum. +func (luc *layerUploadController) TarSum() string { + return luc.LayerUploadState.TarSum +} + +// UUID returns the identifier for this upload. +func (luc *layerUploadController) UUID() string { + return luc.LayerUploadState.UUID +} + +// Offset returns the position of the last byte written to this layer. +func (luc *layerUploadController) Offset() int64 { + return luc.LayerUploadState.Offset +} + +// Finish marks the upload as completed, returning a valid handle to the +// uploaded layer. The final size and checksum are validated against the +// contents of the uploaded layer. The checksum should be provided in the +// format :. +func (luc *layerUploadController) Finish(size int64, digestStr string) (Layer, error) { + + // This section is going to be pretty ugly now. We will have to read the + // file twice. First, to get the tarsum and checksum. When those are + // available, and validated, we will upload it to the blob store and link + // it into the repository. In the future, we need to use resumable hash + // calculations for tarsum and checksum that can be calculated during the + // upload. This will allow us to cut the data directly into a temporary + // directory in the storage backend. + + fp, err := luc.file() + + if err != nil { + // Cleanup? + return nil, err + } + + digest, err := ParseDigest(digestStr) + if err != nil { + return nil, err + } + + if err := luc.validateLayer(fp, size, digest); err != nil { + // Cleanup? + return nil, err + } + + if err := luc.writeLayer(fp, size, digest); err != nil { + // Cleanup? + return nil, err + } + + // Yes! We have written some layer data. Let's make it visible. Link the + // layer blob into the repository. + if err := luc.linkLayer(digest); err != nil { + return nil, err + } + + // Ok, the upload has completed and finished. Delete the state. + if err := luc.uploadStore.DeleteState(luc.UUID()); err != nil { + // Can we ignore this error? + return nil, err + } + + return luc.layerStore.Fetch(luc.TarSum()) +} + +// Cancel the layer upload process. +func (luc *layerUploadController) Cancel() error { + if err := luc.layerStore.uploadStore.DeleteState(luc.UUID()); err != nil { + return err + } + + return luc.Close() +} + +func (luc *layerUploadController) Write(p []byte) (int, error) { + wr, err := luc.file() + if err != nil { + return 0, err + } + + n, err := wr.Write(p) + + // Because we expect the reported offset to be consistent with the storage + // state, unfortunately, we need to Sync on every call to write. + if err := wr.Sync(); err != nil { + // Effectively, ignore the write state if the Sync fails. Report that + // no bytes were written and seek back to the starting offset. + offset, seekErr := wr.Seek(luc.Offset(), os.SEEK_SET) + if seekErr != nil { + // What do we do here? Quite disasterous. + luc.reset() + + return 0, fmt.Errorf("multiple errors encounterd after Sync + Seek: %v then %v", err, seekErr) + } + + if offset != luc.Offset() { + return 0, fmt.Errorf("unexpected offset after seek") + } + + return 0, err + } + + luc.LayerUploadState.Offset += int64(n) + + if err := luc.uploadStore.SaveState(luc.LayerUploadState); err != nil { + // TODO(stevvooe): This failure case may require more thought. + return n, err + } + + return n, err +} + +func (luc *layerUploadController) Close() error { + if luc.err != nil { + return luc.err + } + + if luc.fp != nil { + luc.err = luc.fp.Close() + } + + return luc.err +} + +func (luc *layerUploadController) file() (layerFile, error) { + if luc.fp != nil { + return luc.fp, nil + } + + fp, err := luc.uploadStore.Open(luc.UUID()) + + if err != nil { + return nil, err + } + + // TODO(stevvooe): We may need a more aggressive check here to ensure that + // the file length is equal to the current offset. We may want to sync the + // offset before return the layer upload to the client so it can be + // validated before proceeding with any writes. + + // Seek to the current layer offset for good measure. + if _, err = fp.Seek(luc.Offset(), os.SEEK_SET); err != nil { + return nil, err + } + + luc.fp = fp + + return luc.fp, nil +} + +// reset closes and drops the current writer. +func (luc *layerUploadController) reset() { + if luc.fp != nil { + luc.fp.Close() + luc.fp = nil + } +} + +// validateLayer runs several checks on the layer file to ensure its validity. +// This is currently very expensive and relies on fast io and fast seek. +func (luc *layerUploadController) validateLayer(fp layerFile, size int64, digest Digest) error { + // First, seek to the end of the file, checking the size is as expected. + end, err := fp.Seek(0, os.SEEK_END) + if err != nil { + return err + } + + if end != size { + return ErrLayerInvalidLength + } + + // Now seek back to start and take care of tarsum and checksum. + if _, err := fp.Seek(0, os.SEEK_SET); err != nil { + return err + } + + version, err := tarsum.GetVersionFromTarsum(luc.TarSum()) + if err != nil { + return ErrLayerTarSumVersionUnsupported + } + + // // We only support tarsum version 1 for now. + if version != tarsum.Version1 { + return ErrLayerTarSumVersionUnsupported + } + + ts, err := tarsum.NewTarSum(fp, true, tarsum.Version1) + if err != nil { + return err + } + + h := sha256.New() + + // Pull the layer file through by writing it to a checksum. + nn, err := io.Copy(h, ts) + + if nn != int64(size) { + return fmt.Errorf("bad read while finishing upload(%s) %v: %v != %v, err=%v", luc.UUID(), fp, nn, size, err) + } + + if err != nil && err != io.EOF { + return err + } + + calculatedDigest := NewDigest("sha256", h) + + // Compare the digests! + if digest != calculatedDigest { + return ErrLayerInvalidChecksum + } + + // Compare the tarsums! + if ts.Sum(nil) != luc.TarSum() { + return ErrLayerInvalidTarsum + } + + return nil +} + +// writeLayer actually writes the the layer file into its final destination. +// The layer should be validated before commencing the write. +func (luc *layerUploadController) writeLayer(fp layerFile, size int64, digest Digest) error { + blobPath, err := luc.layerStore.pathMapper.path(blobPathSpec{ + alg: digest.Algorithm(), + digest: digest.Hex(), + }) + + if err != nil { + return err + } + + // Check for existence + if _, err := luc.layerStore.driver.CurrentSize(blobPath); err != nil { + // TODO(stevvooe): This check is kind of problematic and very racy. + switch err := err.(type) { + case storagedriver.PathNotFoundError: + break // ensure that it doesn't exist. + default: + // TODO(stevvooe): This isn't actually an error: the blob store is + // content addressable and we should just use this to ensure we + // have it written. Although, we do need to verify that the + // content that is there is the correct length. + return err + } + } + + // Seek our local layer file back now. + if _, err := fp.Seek(0, os.SEEK_SET); err != nil { + // Cleanup? + return err + } + + // Okay: we can write the file to the blob store. + if err := luc.layerStore.driver.WriteStream(blobPath, 0, uint64(size), fp); err != nil { + return err + } + + return nil +} + +// linkLayer links a valid, written layer blog into the registry, first +// linking the repository namespace, then adding it to the layerindex. +func (luc *layerUploadController) linkLayer(digest Digest) error { + layerLinkPath, err := luc.layerStore.pathMapper.path(layerLinkPathSpec{ + name: luc.Name(), + tarSum: luc.TarSum(), + }) + + if err != nil { + return err + } + + if err := luc.layerStore.driver.PutContent(layerLinkPath, []byte(digest)); err != nil { + return nil + } + + // Link the layer into the name index. + layerIndexLinkPath, err := luc.layerStore.pathMapper.path(layerIndexLinkPathSpec{ + tarSum: luc.TarSum(), + }) + + if err != nil { + return err + } + + // Read back the name index file. If it exists, create it. If not, add the + // new repo to the name list. + + // TODO(stevvooe): This is very racy, as well. Reconsider using list for + // this operation? + layerIndexLinkContent, err := luc.layerStore.driver.GetContent(layerIndexLinkPath) + if err != nil { + switch err := err.(type) { + case storagedriver.PathNotFoundError: + layerIndexLinkContent = []byte(luc.Name()) + default: + return err + } + } + layerIndexLinkContent = luc.maybeAddNameToLayerIndexLinkContent(layerIndexLinkContent) + + // Write the index content back to the index. + return luc.layerStore.driver.PutContent(layerIndexLinkPath, layerIndexLinkContent) +} + +func (luc *layerUploadController) maybeAddNameToLayerIndexLinkContent(content []byte) []byte { + names := strings.Split(string(content), "\n") + var found bool + // Search the names and find ours + for _, name := range names { + if name == luc.Name() { + found = true + } + } + + if !found { + names = append(names, luc.Name()) + } + + sort.Strings(names) + + return []byte(strings.Join(names, "\n")) +} + +// localFSLayerUploadStore implements a local layerUploadStore. There are some +// complexities around hashsums that make round tripping to the storage +// backend problematic, so we'll store and read locally for now. By GO-beta, +// this should be fully implemented on top of the backend storagedriver. +// +// For now, the directory layout is as follows: +// +// //registry-layer-upload/ +// / +// -> state.json +// -> data +// +// Each upload, identified by uuid, has its own directory with a state file +// and a data file. The state file has a json representation of the current +// state. The data file is the in-progress upload data. +type localFSLayerUploadStore struct { + root string +} + +func newTemporaryLocalFSLayerUploadStore() (layerUploadStore, error) { + path, err := ioutil.TempDir("", "registry-layer-upload") + + if err != nil { + return nil, err + } + + return &localFSLayerUploadStore{ + root: path, + }, nil +} + +func (llufs *localFSLayerUploadStore) New(name, tarSum string) (LayerUploadState, error) { + lus := LayerUploadState{ + Name: name, + TarSum: tarSum, + UUID: uuid.New(), + } + + if err := os.Mkdir(llufs.path(lus.UUID, ""), 0755); err != nil { + return lus, err + } + + return lus, nil +} + +func (llufs *localFSLayerUploadStore) Open(uuid string) (layerFile, error) { + fp, err := os.OpenFile(llufs.path(uuid, "data"), os.O_CREATE|os.O_APPEND|os.O_RDWR, 0644) + + if err != nil { + return nil, err + } + + return fp, nil +} + +func (llufs *localFSLayerUploadStore) GetState(uuid string) (LayerUploadState, error) { + // TODO(stevvoe): Storing this state on the local file system is an + // intermediate stop gap. This technique is unlikely to handle any kind of + // concurrency very well. + + var lus LayerUploadState + fp, err := os.Open(llufs.path(uuid, "state.json")) + if err != nil { + if os.IsNotExist(err) { + return lus, ErrLayerUploadUnknown + } + + return lus, err + } + defer fp.Close() + + dec := json.NewDecoder(fp) + if err := dec.Decode(&lus); err != nil { + return lus, err + } + + return lus, nil +} + +func (llufs *localFSLayerUploadStore) SaveState(lus LayerUploadState) error { + p, err := json.Marshal(lus) + if err != nil { + return err + } + + err = ioutil.WriteFile(llufs.path(lus.UUID, "state.json"), p, 0644) + if os.IsNotExist(err) { + return ErrLayerUploadUnknown + } + + return err +} + +func (llufs *localFSLayerUploadStore) DeleteState(uuid string) error { + if err := os.RemoveAll(llufs.path(uuid, "")); err != nil { + if os.IsNotExist(err) { + return ErrLayerUploadUnknown + } + + return err + } + + return nil +} + +func (llufs *localFSLayerUploadStore) path(uuid, file string) string { + return filepath.Join(llufs.root, uuid, file) +} diff --git a/storage/services.go b/storage/services.go new file mode 100644 index 000000000..dbe5dc75d --- /dev/null +++ b/storage/services.go @@ -0,0 +1,44 @@ +package storage + +import ( + "github.com/docker/docker-registry/storagedriver" +) + +// Services provides various services with application-level operations for +// use across backend storage drivers. +type Services struct { + driver storagedriver.StorageDriver + pathMapper *pathMapper + layerUploadStore layerUploadStore +} + +// NewServices creates a new Services object to access docker objects stored +// in the underlying driver. +func NewServices(driver storagedriver.StorageDriver) *Services { + + layerUploadStore, err := newTemporaryLocalFSLayerUploadStore() + + if err != nil { + // TODO(stevvooe): This failure needs to be understood in the context + // of the lifecycle of the services object, which is uncertain at this + // point. + panic("unable to allocate layerUploadStore: " + err.Error()) + } + + return &Services{ + driver: driver, + pathMapper: &pathMapper{ + // TODO(sday): This should be configurable. + root: "/docker/registry/", + version: storagePathVersion, + }, + layerUploadStore: layerUploadStore, + } +} + +// Layers returns an instance of the LayerService. Instantiation is cheap and +// may be context sensitive in the future. The instance should be used similar +// to a request local. +func (ss *Services) Layers() LayerService { + return &layerStore{driver: ss.driver, pathMapper: ss.pathMapper} +}