package storage import ( "bytes" "context" "crypto/sha256" "fmt" "io" "path" "reflect" "testing" "github.com/distribution/distribution/v3" "github.com/distribution/distribution/v3/registry/storage/cache/memory" "github.com/distribution/distribution/v3/registry/storage/driver/inmemory" "github.com/distribution/distribution/v3/testutil" "github.com/distribution/reference" "github.com/opencontainers/go-digest" ) // TestWriteSeek tests that the current file size can be // obtained using Seek func TestWriteSeek(t *testing.T) { ctx := context.Background() imageName, _ := reference.WithName("foo/bar") driver := inmemory.New() registry, err := NewRegistry(ctx, driver, BlobDescriptorCacheProvider(memory.NewInMemoryBlobDescriptorCacheProvider(memory.UnlimitedSize)), EnableDelete, EnableRedirect) if err != nil { t.Fatalf("error creating registry: %v", err) } repository, err := registry.Repository(ctx, imageName) if err != nil { t.Fatalf("unexpected error getting repo: %v", err) } bs := repository.Blobs(ctx) blobUpload, err := bs.Create(ctx) if err != nil { t.Fatalf("unexpected error starting layer upload: %s", err) } contents := []byte{1, 2, 3} blobUpload.Write(contents) blobUpload.Close() offset := blobUpload.Size() if offset != int64(len(contents)) { t.Fatalf("unexpected value for blobUpload offset: %v != %v", offset, len(contents)) } } // TestSimpleBlobUpload covers the blob upload process, exercising common // error paths that might be seen during an upload. func TestSimpleBlobUpload(t *testing.T) { randomDataReader, dgst, err := testutil.CreateRandomTarFile() if err != nil { t.Fatalf("error creating random reader: %v", err) } ctx := context.Background() imageName, _ := reference.WithName("foo/bar") driver := inmemory.New() registry, err := NewRegistry(ctx, driver, BlobDescriptorCacheProvider(memory.NewInMemoryBlobDescriptorCacheProvider(memory.UnlimitedSize)), EnableDelete, EnableRedirect) if err != nil { t.Fatalf("error creating registry: %v", err) } repository, err := registry.Repository(ctx, imageName) if err != nil { t.Fatalf("unexpected error getting repo: %v", err) } bs := repository.Blobs(ctx) h := sha256.New() rd := io.TeeReader(randomDataReader, h) blobUpload, err := bs.Create(ctx) if err != nil { t.Fatalf("unexpected error starting layer upload: %s", err) } // Cancel the upload then restart it if err := blobUpload.Cancel(ctx); err != nil { t.Fatalf("unexpected error during upload cancellation: %v", err) } // get the enclosing directory uploadPath := path.Dir(blobUpload.(*blobWriter).path) // ensure state was cleaned up _, err = driver.List(ctx, uploadPath) if err == nil { t.Fatal("files in upload path after cleanup") } // Do a resume, get unknown upload _, err = bs.Resume(ctx, blobUpload.ID()) if err != distribution.ErrBlobUploadUnknown { t.Fatalf("unexpected error resuming upload, should be unknown: %v", err) } // Restart! blobUpload, err = bs.Create(ctx) if err != nil { t.Fatalf("unexpected error starting layer upload: %s", err) } // Get the size of our random tarfile randomDataSize, err := seekerSize(randomDataReader) if err != nil { t.Fatalf("error getting seeker size of random data: %v", err) } nn, err := io.Copy(blobUpload, rd) if err != nil { t.Fatalf("unexpected error uploading layer data: %v", err) } if nn != randomDataSize { t.Fatalf("layer data write incomplete") } blobUpload.Close() offset := blobUpload.Size() if offset != nn { t.Fatalf("blobUpload not updated with correct offset: %v != %v", offset, nn) } // Do a resume, for good fun blobUpload, err = bs.Resume(ctx, blobUpload.ID()) if err != nil { t.Fatalf("unexpected error resuming upload: %v", err) } sha256Digest := digest.NewDigest("sha256", h) desc, err := blobUpload.Commit(ctx, distribution.Descriptor{Digest: dgst}) if err != nil { t.Fatalf("unexpected error finishing layer upload: %v", err) } // ensure state was cleaned up uploadPath = path.Dir(blobUpload.(*blobWriter).path) _, err = driver.List(ctx, uploadPath) if err == nil { t.Fatal("files in upload path after commit") } // After finishing an upload, it should no longer exist. if _, err := bs.Resume(ctx, blobUpload.ID()); err != distribution.ErrBlobUploadUnknown { t.Fatalf("expected layer upload to be unknown, got %v", err) } // Test for existence. statDesc, err := bs.Stat(ctx, desc.Digest) if err != nil { t.Fatalf("unexpected error checking for existence: %v, %#v", err, bs) } if !reflect.DeepEqual(statDesc, desc) { t.Fatalf("descriptors not equal: %v != %v", statDesc, desc) } rc, err := bs.Open(ctx, desc.Digest) if err != nil { t.Fatalf("unexpected error opening blob for read: %v", err) } defer rc.Close() h.Reset() nn, err = io.Copy(h, rc) if err != nil { t.Fatalf("error reading layer: %v", err) } if nn != randomDataSize { t.Fatalf("incorrect read length") } if digest.NewDigest("sha256", h) != sha256Digest { t.Fatalf("unexpected digest from uploaded layer: %q != %q", digest.NewDigest("sha256", h), sha256Digest) } // Delete a blob err = bs.Delete(ctx, desc.Digest) if err != nil { t.Fatalf("Unexpected error deleting blob") } d, err := bs.Stat(ctx, desc.Digest) if err == nil { t.Fatalf("unexpected non-error stating deleted blob: %v", d) } switch err { case distribution.ErrBlobUnknown: break default: t.Errorf("Unexpected error type stat-ing deleted manifest: %#v", err) } _, err = bs.Open(ctx, desc.Digest) if err == nil { t.Fatalf("unexpected success opening deleted blob for read") } switch err { case distribution.ErrBlobUnknown: break default: t.Errorf("Unexpected error type getting deleted manifest: %#v", err) } // Re-upload the blob randomBlob, err := io.ReadAll(randomDataReader) if err != nil { t.Fatalf("Error reading all of blob %s", err.Error()) } expectedDigest := digest.FromBytes(randomBlob) simpleUpload(t, bs, randomBlob, expectedDigest) d, err = bs.Stat(ctx, expectedDigest) if err != nil { t.Errorf("unexpected error stat-ing blob") } if d.Digest != expectedDigest { t.Errorf("Mismatching digest with restored blob") } _, err = bs.Open(ctx, expectedDigest) if err != nil { t.Errorf("Unexpected error opening blob") } // Reuse state to test delete with a delete-disabled registry registry, err = NewRegistry(ctx, driver, BlobDescriptorCacheProvider(memory.NewInMemoryBlobDescriptorCacheProvider(memory.UnlimitedSize)), EnableRedirect) if err != nil { t.Fatalf("error creating registry: %v", err) } repository, err = registry.Repository(ctx, imageName) if err != nil { t.Fatalf("unexpected error getting repo: %v", err) } bs = repository.Blobs(ctx) err = bs.Delete(ctx, desc.Digest) if err == nil { t.Errorf("Unexpected success deleting while disabled") } } // TestSimpleBlobRead just creates a simple blob file and ensures that basic // open, read, seek, read works. More specific edge cases should be covered in // other tests. func TestSimpleBlobRead(t *testing.T) { ctx := context.Background() imageName, _ := reference.WithName("foo/bar") driver := inmemory.New() registry, err := NewRegistry(ctx, driver, BlobDescriptorCacheProvider(memory.NewInMemoryBlobDescriptorCacheProvider(memory.UnlimitedSize)), EnableDelete, EnableRedirect) if err != nil { t.Fatalf("error creating registry: %v", err) } repository, err := registry.Repository(ctx, imageName) if err != nil { t.Fatalf("unexpected error getting repo: %v", err) } bs := repository.Blobs(ctx) randomLayerReader, dgst, err := testutil.CreateRandomTarFile() // TODO(stevvooe): Consider using just a random string. if err != nil { t.Fatalf("error creating random data: %v", err) } // Test for existence. desc, err := bs.Stat(ctx, dgst) if err != distribution.ErrBlobUnknown { t.Fatalf("expected not found error when testing for existence: %v", err) } _, err = bs.Open(ctx, dgst) if err != distribution.ErrBlobUnknown { t.Fatalf("expected not found error when opening non-existent blob: %v", err) } randomLayerSize, err := seekerSize(randomLayerReader) if err != nil { t.Fatalf("error getting seeker size for random layer: %v", err) } descBefore := distribution.Descriptor{Digest: dgst, MediaType: "application/octet-stream", Size: randomLayerSize} t.Logf("desc: %v", descBefore) desc, err = addBlob(ctx, bs, descBefore, randomLayerReader) if err != nil { t.Fatalf("error adding blob to blobservice: %v", err) } if desc.Size != randomLayerSize { t.Fatalf("committed blob has incorrect length: %v != %v", desc.Size, randomLayerSize) } rc, err := bs.Open(ctx, desc.Digest) // note that we are opening with original digest. if err != nil { t.Fatalf("error opening blob with %v: %v", dgst, err) } defer rc.Close() // Now check the sha digest and ensure its the same h := sha256.New() nn, err := io.Copy(h, rc) if err != nil { t.Fatalf("unexpected error copying to hash: %v", err) } if nn != randomLayerSize { t.Fatalf("stored incorrect number of bytes in blob: %d != %d", nn, randomLayerSize) } sha256Digest := digest.NewDigest("sha256", h) if sha256Digest != desc.Digest { t.Fatalf("fetched digest does not match: %q != %q", sha256Digest, desc.Digest) } // Now seek back the blob, read the whole thing and check against randomLayerData offset, err := rc.Seek(0, io.SeekStart) if err != nil { t.Fatalf("error seeking blob: %v", err) } if offset != 0 { t.Fatalf("seek failed: expected 0 offset, got %d", offset) } p, err := io.ReadAll(rc) if err != nil { t.Fatalf("error reading all of blob: %v", err) } if len(p) != int(randomLayerSize) { t.Fatalf("blob data read has different length: %v != %v", len(p), randomLayerSize) } // Reset the randomLayerReader and read back the buffer _, err = randomLayerReader.Seek(0, io.SeekStart) if err != nil { t.Fatalf("error resetting layer reader: %v", err) } randomLayerData, err := io.ReadAll(randomLayerReader) if err != nil { t.Fatalf("random layer read failed: %v", err) } if !bytes.Equal(p, randomLayerData) { t.Fatalf("layer data not equal") } } // TestBlobMount covers the blob mount process, exercising common // error paths that might be seen during a mount. func TestBlobMount(t *testing.T) { randomDataReader, dgst, err := testutil.CreateRandomTarFile() if err != nil { t.Fatalf("error creating random reader: %v", err) } ctx := context.Background() imageName, _ := reference.WithName("foo/bar") sourceImageName, _ := reference.WithName("foo/source") driver := inmemory.New() registry, err := NewRegistry(ctx, driver, BlobDescriptorCacheProvider(memory.NewInMemoryBlobDescriptorCacheProvider(memory.UnlimitedSize)), EnableDelete, EnableRedirect) if err != nil { t.Fatalf("error creating registry: %v", err) } repository, err := registry.Repository(ctx, imageName) if err != nil { t.Fatalf("unexpected error getting repo: %v", err) } sourceRepository, err := registry.Repository(ctx, sourceImageName) if err != nil { t.Fatalf("unexpected error getting repo: %v", err) } sbs := sourceRepository.Blobs(ctx) blobUpload, err := sbs.Create(ctx) if err != nil { t.Fatalf("unexpected error starting layer upload: %s", err) } // Get the size of our random tarfile randomDataSize, err := seekerSize(randomDataReader) if err != nil { t.Fatalf("error getting seeker size of random data: %v", err) } _, err = io.Copy(blobUpload, randomDataReader) if err != nil { t.Fatalf("unexpected error uploading layer data: %v", err) } desc, err := blobUpload.Commit(ctx, distribution.Descriptor{Digest: dgst}) if err != nil { t.Fatalf("unexpected error finishing layer upload: %v", err) } // Test for existence. statDesc, err := sbs.Stat(ctx, desc.Digest) if err != nil { t.Fatalf("unexpected error checking for existence: %v, %#v", err, sbs) } if !reflect.DeepEqual(statDesc, desc) { t.Fatalf("descriptors not equal: %v != %v", statDesc, desc) } bs := repository.Blobs(ctx) // Test destination for existence. _, err = bs.Stat(ctx, desc.Digest) if err == nil { t.Fatalf("unexpected non-error stating unmounted blob: %v", desc) } canonicalRef, err := reference.WithDigest(sourceRepository.Named(), desc.Digest) if err != nil { t.Fatal(err) } bw, err := bs.Create(ctx, WithMountFrom(canonicalRef)) if bw != nil { t.Fatal("unexpected blobwriter returned from Create call, should mount instead") } ebm, ok := err.(distribution.ErrBlobMounted) if !ok { t.Fatalf("unexpected error mounting layer: %v", err) } if !reflect.DeepEqual(ebm.Descriptor, desc) { t.Fatalf("descriptors not equal: %v != %v", ebm.Descriptor, desc) } // Test for existence. statDesc, err = bs.Stat(ctx, desc.Digest) if err != nil { t.Fatalf("unexpected error checking for existence: %v, %#v", err, bs) } if !reflect.DeepEqual(statDesc, desc) { t.Fatalf("descriptors not equal: %v != %v", statDesc, desc) } rc, err := bs.Open(ctx, desc.Digest) if err != nil { t.Fatalf("unexpected error opening blob for read: %v", err) } defer rc.Close() h := sha256.New() nn, err := io.Copy(h, rc) if err != nil { t.Fatalf("error reading layer: %v", err) } if nn != randomDataSize { t.Fatalf("incorrect read length") } if digest.NewDigest("sha256", h) != dgst { t.Fatalf("unexpected digest from uploaded layer: %q != %q", digest.NewDigest("sha256", h), dgst) } // Delete the blob from the source repo err = sbs.Delete(ctx, desc.Digest) if err != nil { t.Fatalf("Unexpected error deleting blob") } _, err = bs.Stat(ctx, desc.Digest) if err != nil { t.Fatalf("unexpected error stating blob deleted from source repository: %v", err) } d, err := sbs.Stat(ctx, desc.Digest) if err == nil { t.Fatalf("unexpected non-error stating deleted blob: %v", d) } switch err { case distribution.ErrBlobUnknown: break default: t.Errorf("Unexpected error type stat-ing deleted manifest: %#v", err) } // Delete the blob from the dest repo err = bs.Delete(ctx, desc.Digest) if err != nil { t.Fatalf("Unexpected error deleting blob") } d, err = bs.Stat(ctx, desc.Digest) if err == nil { t.Fatalf("unexpected non-error stating deleted blob: %v", d) } switch err { case distribution.ErrBlobUnknown: break default: t.Errorf("Unexpected error type stat-ing deleted manifest: %#v", err) } } // TestLayerUploadZeroLength uploads zero-length func TestLayerUploadZeroLength(t *testing.T) { ctx := context.Background() imageName, _ := reference.WithName("foo/bar") driver := inmemory.New() registry, err := NewRegistry(ctx, driver, BlobDescriptorCacheProvider(memory.NewInMemoryBlobDescriptorCacheProvider(memory.UnlimitedSize)), EnableDelete, EnableRedirect) if err != nil { t.Fatalf("error creating registry: %v", err) } repository, err := registry.Repository(ctx, imageName) if err != nil { t.Fatalf("unexpected error getting repo: %v", err) } bs := repository.Blobs(ctx) simpleUpload(t, bs, []byte{}, digestSha256Empty) } func simpleUpload(t *testing.T, bs distribution.BlobIngester, blob []byte, expectedDigest digest.Digest) { ctx := context.Background() wr, err := bs.Create(ctx) if err != nil { t.Fatalf("unexpected error starting upload: %v", err) } nn, err := io.Copy(wr, bytes.NewReader(blob)) if err != nil { t.Fatalf("error copying into blob writer: %v", err) } if nn != 0 { t.Fatalf("unexpected number of bytes copied: %v > 0", nn) } dgst, err := digest.FromReader(bytes.NewReader(blob)) if err != nil { t.Fatalf("error getting digest: %v", err) } if dgst != expectedDigest { // sanity check on zero digest t.Fatalf("digest not as expected: %v != %v", dgst, expectedDigest) } desc, err := wr.Commit(ctx, distribution.Descriptor{Digest: dgst}) if err != nil { t.Fatalf("unexpected error committing write: %v", err) } if desc.Digest != dgst { t.Fatalf("unexpected digest: %v != %v", desc.Digest, dgst) } } // seekerSize seeks to the end of seeker, checks the size and returns it to // the original state, returning the size. The state of the seeker should be // treated as unknown if an error is returned. func seekerSize(seeker io.ReadSeeker) (int64, error) { current, err := seeker.Seek(0, io.SeekCurrent) if err != nil { return 0, err } end, err := seeker.Seek(0, io.SeekEnd) if err != nil { return 0, err } resumed, err := seeker.Seek(current, io.SeekStart) if err != nil { return 0, err } if resumed != current { return 0, fmt.Errorf("error returning seeker to original state, could not seek back to original location") } return end, nil } // addBlob simply consumes the reader and inserts into the blob service, // returning a descriptor on success. func addBlob(ctx context.Context, bs distribution.BlobIngester, desc distribution.Descriptor, rd io.Reader) (distribution.Descriptor, error) { wr, err := bs.Create(ctx) if err != nil { return distribution.Descriptor{}, err } defer wr.Cancel(ctx) if nn, err := io.Copy(wr, rd); err != nil { return distribution.Descriptor{}, err } else if nn != desc.Size { return distribution.Descriptor{}, fmt.Errorf("incorrect number of bytes copied: %v != %v", nn, desc.Size) } return wr.Commit(ctx, desc) }