package storage import ( "fmt" "io" "os" "path" "strconv" "time" "github.com/Sirupsen/logrus" "github.com/docker/distribution" "github.com/docker/distribution/context" "github.com/docker/distribution/digest" storagedriver "github.com/docker/distribution/registry/storage/driver" ) var _ distribution.LayerUpload = &layerWriter{} // layerWriter is used to control the various aspects of resumable // layer upload. It implements the LayerUpload interface. type layerWriter struct { layerStore *layerStore uuid string startedAt time.Time resumableDigester digest.ResumableDigester // implementes io.WriteSeeker, io.ReaderFrom and io.Closer to satisfy // LayerUpload Interface bufferedFileWriter } var _ distribution.LayerUpload = &layerWriter{} // UUID returns the identifier for this upload. func (lw *layerWriter) UUID() string { return lw.uuid } func (lw *layerWriter) StartedAt() time.Time { return lw.startedAt } // Finish marks the upload as completed, returning a valid handle to the // uploaded layer. The final size and checksum are validated against the // contents of the uploaded layer. The checksum should be provided in the // format :. func (lw *layerWriter) Finish(dgst digest.Digest) (distribution.Layer, error) { context.GetLogger(lw.layerStore.repository.ctx).Debug("(*layerWriter).Finish") if err := lw.bufferedFileWriter.Close(); err != nil { return nil, err } var ( canonical digest.Digest err error ) // HACK(stevvooe): To deal with s3's lack of consistency, attempt to retry // validation on failure. Three attempts are made, backing off // retries*100ms each time. for retries := 0; ; retries++ { canonical, err = lw.validateLayer(dgst) if err == nil { break } context.GetLoggerWithField(lw.layerStore.repository.ctx, "retries", retries). Errorf("error validating layer: %v", err) if retries < 3 { time.Sleep(100 * time.Millisecond * time.Duration(retries+1)) continue } return nil, err } if err := lw.moveLayer(canonical); err != nil { // TODO(stevvooe): Cleanup? return nil, err } // Link the layer blob into the repository. if err := lw.linkLayer(canonical, dgst); err != nil { return nil, err } if err := lw.removeResources(); err != nil { return nil, err } return lw.layerStore.Fetch(canonical) } // Cancel the layer upload process. func (lw *layerWriter) Cancel() error { context.GetLogger(lw.layerStore.repository.ctx).Debug("(*layerWriter).Cancel") if err := lw.removeResources(); err != nil { return err } lw.Close() return nil } func (lw *layerWriter) Write(p []byte) (int, error) { if lw.resumableDigester == nil { return lw.bufferedFileWriter.Write(p) } // Ensure that the current write offset matches how many bytes have been // written to the digester. If not, we need to update the digest state to // match the current write position. if err := lw.resumeHashAt(lw.offset); err != nil { return 0, err } return io.MultiWriter(&lw.bufferedFileWriter, lw.resumableDigester).Write(p) } func (lw *layerWriter) ReadFrom(r io.Reader) (n int64, err error) { if lw.resumableDigester == nil { return lw.bufferedFileWriter.ReadFrom(r) } // Ensure that the current write offset matches how many bytes have been // written to the digester. If not, we need to update the digest state to // match the current write position. if err := lw.resumeHashAt(lw.offset); err != nil { return 0, err } return lw.bufferedFileWriter.ReadFrom(io.TeeReader(r, lw.resumableDigester)) } func (lw *layerWriter) Close() error { if lw.err != nil { return lw.err } if lw.resumableDigester != nil { if err := lw.storeHashState(); err != nil { return err } } return lw.bufferedFileWriter.Close() } type hashStateEntry struct { offset int64 path string } // getStoredHashStates returns a slice of hashStateEntries for this upload. func (lw *layerWriter) getStoredHashStates() ([]hashStateEntry, error) { uploadHashStatePathPrefix, err := lw.layerStore.repository.pm.path(uploadHashStatePathSpec{ name: lw.layerStore.repository.Name(), uuid: lw.uuid, alg: lw.resumableDigester.Digest().Algorithm(), list: true, }) if err != nil { return nil, err } paths, err := lw.driver.List(lw.layerStore.repository.ctx, uploadHashStatePathPrefix) if err != nil { if _, ok := err.(storagedriver.PathNotFoundError); !ok { return nil, err } // Treat PathNotFoundError as no entries. paths = nil } hashStateEntries := make([]hashStateEntry, 0, len(paths)) for _, p := range paths { pathSuffix := path.Base(p) // The suffix should be the offset. offset, err := strconv.ParseInt(pathSuffix, 0, 64) if err != nil { logrus.Errorf("unable to parse offset from upload state path %q: %s", p, err) } hashStateEntries = append(hashStateEntries, hashStateEntry{offset: offset, path: p}) } return hashStateEntries, nil } // resumeHashAt attempts to restore the state of the internal hash function // by loading the most recent saved hash state less than or equal to the given // offset. Any unhashed bytes remaining less than the given offset are hashed // from the content uploaded so far. func (lw *layerWriter) resumeHashAt(offset int64) error { if offset < 0 { return fmt.Errorf("cannot resume hash at negative offset: %d", offset) } if offset == int64(lw.resumableDigester.Len()) { // State of digester is already at the requested offset. return nil } // List hash states from storage backend. var hashStateMatch hashStateEntry hashStates, err := lw.getStoredHashStates() if err != nil { return fmt.Errorf("unable to get stored hash states with offset %d: %s", offset, err) } ctx := lw.layerStore.repository.ctx // Find the highest stored hashState with offset less than or equal to // the requested offset. for _, hashState := range hashStates { if hashState.offset == offset { hashStateMatch = hashState break // Found an exact offset match. } else if hashState.offset < offset && hashState.offset > hashStateMatch.offset { // This offset is closer to the requested offset. hashStateMatch = hashState } else if hashState.offset > offset { // Remove any stored hash state with offsets higher than this one // as writes to this resumed hasher will make those invalid. This // is probably okay to skip for now since we don't expect anyone to // use the API in this way. For that reason, we don't treat an // an error here as a fatal error, but only log it. if err := lw.driver.Delete(ctx, hashState.path); err != nil { logrus.Errorf("unable to delete stale hash state %q: %s", hashState.path, err) } } } if hashStateMatch.offset == 0 { // No need to load any state, just reset the hasher. lw.resumableDigester.Reset() } else { storedState, err := lw.driver.GetContent(ctx, hashStateMatch.path) if err != nil { return err } if err = lw.resumableDigester.Restore(storedState); err != nil { return err } } // Mind the gap. if gapLen := offset - int64(lw.resumableDigester.Len()); gapLen > 0 { // Need to read content from the upload to catch up to the desired offset. fr, err := newFileReader(ctx, lw.driver, lw.path) if err != nil { return err } if _, err = fr.Seek(int64(lw.resumableDigester.Len()), os.SEEK_SET); err != nil { return fmt.Errorf("unable to seek to layer reader offset %d: %s", lw.resumableDigester.Len(), err) } if _, err := io.CopyN(lw.resumableDigester, fr, gapLen); err != nil { return err } } return nil } func (lw *layerWriter) storeHashState() error { uploadHashStatePath, err := lw.layerStore.repository.pm.path(uploadHashStatePathSpec{ name: lw.layerStore.repository.Name(), uuid: lw.uuid, alg: lw.resumableDigester.Digest().Algorithm(), offset: int64(lw.resumableDigester.Len()), }) if err != nil { return err } hashState, err := lw.resumableDigester.State() if err != nil { return err } return lw.driver.PutContent(lw.layerStore.repository.ctx, uploadHashStatePath, hashState) } // validateLayer checks the layer data against the digest, returning an error // if it does not match. The canonical digest is returned. func (lw *layerWriter) validateLayer(dgst digest.Digest) (digest.Digest, error) { var ( verified, fullHash bool canonical digest.Digest ) if lw.resumableDigester != nil { // Restore the hasher state to the end of the upload. if err := lw.resumeHashAt(lw.size); err != nil { return "", err } canonical = lw.resumableDigester.Digest() if canonical.Algorithm() == dgst.Algorithm() { // Common case: client and server prefer the same canonical digest // algorithm - currently SHA256. verified = dgst == canonical } else { // The client wants to use a different digest algorithm. They'll just // have to be patient and wait for us to download and re-hash the // uploaded content using that digest algorithm. fullHash = true } } else { // Not using resumable digests, so we need to hash the entire layer. fullHash = true } if fullHash { digester := digest.NewCanonicalDigester() digestVerifier, err := digest.NewDigestVerifier(dgst) if err != nil { return "", err } // Read the file from the backend driver and validate it. fr, err := newFileReader(lw.layerStore.repository.ctx, lw.bufferedFileWriter.driver, lw.path) if err != nil { return "", err } tr := io.TeeReader(fr, digester) if _, err = io.Copy(digestVerifier, tr); err != nil { return "", err } canonical = digester.Digest() verified = digestVerifier.Verified() } if !verified { context.GetLoggerWithField(lw.layerStore.repository.ctx, "canonical", dgst). Errorf("canonical digest does match provided digest") return "", distribution.ErrLayerInvalidDigest{ Digest: dgst, Reason: fmt.Errorf("content does not match digest"), } } return canonical, nil } // moveLayer moves the data into its final, hash-qualified destination, // identified by dgst. The layer should be validated before commencing the // move. func (lw *layerWriter) moveLayer(dgst digest.Digest) error { blobPath, err := lw.layerStore.repository.pm.path(blobDataPathSpec{ digest: dgst, }) if err != nil { return err } ctx := lw.layerStore.repository.ctx // Check for existence if _, err := lw.driver.Stat(ctx, blobPath); err != nil { switch err := err.(type) { case storagedriver.PathNotFoundError: break // ensure that it doesn't exist. default: return err } } else { // If the path exists, we can assume that the content has already // been uploaded, since the blob storage is content-addressable. // While it may be corrupted, detection of such corruption belongs // elsewhere. return nil } // If no data was received, we may not actually have a file on disk. Check // the size here and write a zero-length file to blobPath if this is the // case. For the most part, this should only ever happen with zero-length // tars. if _, err := lw.driver.Stat(ctx, lw.path); err != nil { switch err := err.(type) { case storagedriver.PathNotFoundError: // HACK(stevvooe): This is slightly dangerous: if we verify above, // get a hash, then the underlying file is deleted, we risk moving // a zero-length blob into a nonzero-length blob location. To // prevent this horrid thing, we employ the hack of only allowing // to this happen for the zero tarsum. if dgst == digest.DigestSha256EmptyTar { return lw.driver.PutContent(ctx, blobPath, []byte{}) } // We let this fail during the move below. logrus. WithField("upload.uuid", lw.UUID()). WithField("digest", dgst).Warnf("attempted to move zero-length content with non-zero digest") default: return err // unrelated error } } return lw.driver.Move(ctx, lw.path, blobPath) } // linkLayer links a valid, written layer blob into the registry under the // named repository for the upload controller. func (lw *layerWriter) linkLayer(canonical digest.Digest, aliases ...digest.Digest) error { dgsts := append([]digest.Digest{canonical}, aliases...) // Don't make duplicate links. seenDigests := make(map[digest.Digest]struct{}, len(dgsts)) for _, dgst := range dgsts { if _, seen := seenDigests[dgst]; seen { continue } seenDigests[dgst] = struct{}{} layerLinkPath, err := lw.layerStore.repository.pm.path(layerLinkPathSpec{ name: lw.layerStore.repository.Name(), digest: dgst, }) if err != nil { return err } ctx := lw.layerStore.repository.ctx if err := lw.layerStore.repository.driver.PutContent(ctx, layerLinkPath, []byte(canonical)); err != nil { return err } } return nil } // removeResources should clean up all resources associated with the upload // instance. An error will be returned if the clean up cannot proceed. If the // resources are already not present, no error will be returned. func (lw *layerWriter) removeResources() error { dataPath, err := lw.layerStore.repository.pm.path(uploadDataPathSpec{ name: lw.layerStore.repository.Name(), uuid: lw.uuid, }) if err != nil { return err } // Resolve and delete the containing directory, which should include any // upload related files. dirPath := path.Dir(dataPath) if err := lw.driver.Delete(lw.layerStore.repository.ctx, dirPath); err != nil { switch err := err.(type) { case storagedriver.PathNotFoundError: break // already gone! default: // This should be uncommon enough such that returning an error // should be okay. At this point, the upload should be mostly // complete, but perhaps the backend became unaccessible. logrus.Errorf("unable to delete layer upload resources %q: %v", dirPath, err) return err } } return nil }