package storage import ( "bytes" "crypto/sha256" "fmt" "io" "io/ioutil" "os" "testing" "github.com/docker/distribution" "github.com/docker/distribution/context" "github.com/docker/distribution/digest" "github.com/docker/distribution/registry/storage/cache/memory" "github.com/docker/distribution/registry/storage/driver/inmemory" "github.com/docker/distribution/testutil" ) // TestSimpleBlobUpload covers the blob upload process, exercising common // error paths that might be seen during an upload. func TestSimpleBlobUpload(t *testing.T) { randomDataReader, tarSumStr, err := testutil.CreateRandomTarFile() if err != nil { t.Fatalf("error creating random reader: %v", err) } dgst := digest.Digest(tarSumStr) if err != nil { t.Fatalf("error allocating upload store: %v", err) } ctx := context.Background() imageName := "foo/bar" driver := inmemory.New() registry := NewRegistryWithDriver(ctx, driver, memory.NewInMemoryBlobDescriptorCacheProvider()) repository, err := registry.Repository(ctx, imageName) if err != nil { t.Fatalf("unexpected error getting repo: %v", err) } bs := repository.Blobs(ctx) h := sha256.New() rd := io.TeeReader(randomDataReader, h) blobUpload, err := bs.Create(ctx) if err != nil { t.Fatalf("unexpected error starting layer upload: %s", err) } // Cancel the upload then restart it if err := blobUpload.Cancel(ctx); err != nil { t.Fatalf("unexpected error during upload cancellation: %v", err) } // Do a resume, get unknown upload blobUpload, err = bs.Resume(ctx, blobUpload.ID()) if err != distribution.ErrBlobUploadUnknown { t.Fatalf("unexpected error resuming upload, should be unknown: %v", err) } // Restart! blobUpload, err = bs.Create(ctx) if err != nil { t.Fatalf("unexpected error starting layer upload: %s", err) } // Get the size of our random tarfile randomDataSize, err := seekerSize(randomDataReader) if err != nil { t.Fatalf("error getting seeker size of random data: %v", err) } nn, err := io.Copy(blobUpload, rd) if err != nil { t.Fatalf("unexpected error uploading layer data: %v", err) } if nn != randomDataSize { t.Fatalf("layer data write incomplete") } offset, err := blobUpload.Seek(0, os.SEEK_CUR) if err != nil { t.Fatalf("unexpected error seeking layer upload: %v", err) } if offset != nn { t.Fatalf("blobUpload not updated with correct offset: %v != %v", offset, nn) } blobUpload.Close() // Do a resume, for good fun blobUpload, err = bs.Resume(ctx, blobUpload.ID()) if err != nil { t.Fatalf("unexpected error resuming upload: %v", err) } sha256Digest := digest.NewDigest("sha256", h) desc, err := blobUpload.Commit(ctx, distribution.Descriptor{Digest: dgst}) if err != nil { t.Fatalf("unexpected error finishing layer upload: %v", err) } // After finishing an upload, it should no longer exist. if _, err := bs.Resume(ctx, blobUpload.ID()); err != distribution.ErrBlobUploadUnknown { t.Fatalf("expected layer upload to be unknown, got %v", err) } // Test for existence. statDesc, err := bs.Stat(ctx, desc.Digest) if err != nil { t.Fatalf("unexpected error checking for existence: %v, %#v", err, bs) } if statDesc != desc { t.Fatalf("descriptors not equal: %v != %v", statDesc, desc) } rc, err := bs.Open(ctx, desc.Digest) if err != nil { t.Fatalf("unexpected error opening blob for read: %v", err) } defer rc.Close() h.Reset() nn, err = io.Copy(h, rc) if err != nil { t.Fatalf("error reading layer: %v", err) } if nn != randomDataSize { t.Fatalf("incorrect read length") } if digest.NewDigest("sha256", h) != sha256Digest { t.Fatalf("unexpected digest from uploaded layer: %q != %q", digest.NewDigest("sha256", h), sha256Digest) } } // TestSimpleBlobRead just creates a simple blob file and ensures that basic // open, read, seek, read works. More specific edge cases should be covered in // other tests. func TestSimpleBlobRead(t *testing.T) { ctx := context.Background() imageName := "foo/bar" driver := inmemory.New() registry := NewRegistryWithDriver(ctx, driver, memory.NewInMemoryBlobDescriptorCacheProvider()) repository, err := registry.Repository(ctx, imageName) if err != nil { t.Fatalf("unexpected error getting repo: %v", err) } bs := repository.Blobs(ctx) randomLayerReader, tarSumStr, err := testutil.CreateRandomTarFile() // TODO(stevvooe): Consider using just a random string. if err != nil { t.Fatalf("error creating random data: %v", err) } dgst := digest.Digest(tarSumStr) // Test for existence. desc, err := bs.Stat(ctx, dgst) if err != distribution.ErrBlobUnknown { t.Fatalf("expected not found error when testing for existence: %v", err) } rc, err := bs.Open(ctx, dgst) if err != distribution.ErrBlobUnknown { t.Fatalf("expected not found error when opening non-existent blob: %v", err) } randomLayerSize, err := seekerSize(randomLayerReader) if err != nil { t.Fatalf("error getting seeker size for random layer: %v", err) } descBefore := distribution.Descriptor{Digest: dgst, MediaType: "application/octet-stream", Size: randomLayerSize} t.Logf("desc: %v", descBefore) desc, err = addBlob(ctx, bs, descBefore, randomLayerReader) if err != nil { t.Fatalf("error adding blob to blobservice: %v", err) } if desc.Size != randomLayerSize { t.Fatalf("committed blob has incorrect length: %v != %v", desc.Size, randomLayerSize) } rc, err = bs.Open(ctx, desc.Digest) // note that we are opening with original digest. if err != nil { t.Fatalf("error opening blob with %v: %v", dgst, err) } defer rc.Close() // Now check the sha digest and ensure its the same h := sha256.New() nn, err := io.Copy(h, rc) if err != nil { t.Fatalf("unexpected error copying to hash: %v", err) } if nn != randomLayerSize { t.Fatalf("stored incorrect number of bytes in blob: %d != %d", nn, randomLayerSize) } sha256Digest := digest.NewDigest("sha256", h) if sha256Digest != desc.Digest { t.Fatalf("fetched digest does not match: %q != %q", sha256Digest, desc.Digest) } // Now seek back the blob, read the whole thing and check against randomLayerData offset, err := rc.Seek(0, os.SEEK_SET) if err != nil { t.Fatalf("error seeking blob: %v", err) } if offset != 0 { t.Fatalf("seek failed: expected 0 offset, got %d", offset) } p, err := ioutil.ReadAll(rc) if err != nil { t.Fatalf("error reading all of blob: %v", err) } if len(p) != int(randomLayerSize) { t.Fatalf("blob data read has different length: %v != %v", len(p), randomLayerSize) } // Reset the randomLayerReader and read back the buffer _, err = randomLayerReader.Seek(0, os.SEEK_SET) if err != nil { t.Fatalf("error resetting layer reader: %v", err) } randomLayerData, err := ioutil.ReadAll(randomLayerReader) if err != nil { t.Fatalf("random layer read failed: %v", err) } if !bytes.Equal(p, randomLayerData) { t.Fatalf("layer data not equal") } } // TestLayerUploadZeroLength uploads zero-length func TestLayerUploadZeroLength(t *testing.T) { ctx := context.Background() imageName := "foo/bar" driver := inmemory.New() registry := NewRegistryWithDriver(ctx, driver, memory.NewInMemoryBlobDescriptorCacheProvider()) repository, err := registry.Repository(ctx, imageName) if err != nil { t.Fatalf("unexpected error getting repo: %v", err) } bs := repository.Blobs(ctx) wr, err := bs.Create(ctx) if err != nil { t.Fatalf("unexpected error starting upload: %v", err) } nn, err := io.Copy(wr, bytes.NewReader([]byte{})) if err != nil { t.Fatalf("error copying into blob writer: %v", err) } if nn != 0 { t.Fatalf("unexpected number of bytes copied: %v > 0", nn) } dgst, err := digest.FromReader(bytes.NewReader([]byte{})) if err != nil { t.Fatalf("error getting zero digest: %v", err) } if dgst != digest.DigestSha256EmptyTar { // sanity check on zero digest t.Fatalf("digest not as expected: %v != %v", dgst, digest.DigestTarSumV1EmptyTar) } desc, err := wr.Commit(ctx, distribution.Descriptor{Digest: dgst}) if err != nil { t.Fatalf("unexpected error committing write: %v", err) } if desc.Digest != dgst { t.Fatalf("unexpected digest: %v != %v", desc.Digest, dgst) } } // seekerSize seeks to the end of seeker, checks the size and returns it to // the original state, returning the size. The state of the seeker should be // treated as unknown if an error is returned. func seekerSize(seeker io.ReadSeeker) (int64, error) { current, err := seeker.Seek(0, os.SEEK_CUR) if err != nil { return 0, err } end, err := seeker.Seek(0, os.SEEK_END) if err != nil { return 0, err } resumed, err := seeker.Seek(current, os.SEEK_SET) if err != nil { return 0, err } if resumed != current { return 0, fmt.Errorf("error returning seeker to original state, could not seek back to original location") } return end, nil } // addBlob simply consumes the reader and inserts into the blob service, // returning a descriptor on success. func addBlob(ctx context.Context, bs distribution.BlobIngester, desc distribution.Descriptor, rd io.Reader) (distribution.Descriptor, error) { wr, err := bs.Create(ctx) if err != nil { return distribution.Descriptor{}, err } defer wr.Cancel(ctx) if nn, err := io.Copy(wr, rd); err != nil { return distribution.Descriptor{}, err } else if nn != desc.Size { return distribution.Descriptor{}, fmt.Errorf("incorrect number of bytes copied: %v != %v", nn, desc.Size) } return wr.Commit(ctx, desc) }