diff --git a/registry/storage/driver/swift/swift.go b/registry/storage/driver/swift/swift.go index 358ca69fc..e0dada31d 100644 --- a/registry/storage/driver/swift/swift.go +++ b/registry/storage/driver/swift/swift.go @@ -17,6 +17,7 @@ package swift import ( "bytes" + "crypto/md5" "crypto/rand" "crypto/sha1" "crypto/tls" @@ -48,6 +49,12 @@ const defaultChunkSize = 20 * 1024 * 1024 // minChunkSize defines the minimum size of a segment const minChunkSize = 1 << 20 +// readAfterWriteTimeout defines the time we wait before an object appears after having been uploaded +var readAfterWriteTimeout = 15 * time.Second + +// readAfterWriteWait defines the time to sleep between two retries +var readAfterWriteWait = 200 * time.Millisecond + // Parameters A struct that encapsulates all of the driver parameters after all values have been set type Parameters struct { Username string @@ -318,6 +325,7 @@ func (d *driver) WriteStream(ctx context.Context, path string, offset int64, rea partNumber := 1 chunkSize := int64(d.ChunkSize) zeroBuf := make([]byte, d.ChunkSize) + hash := md5.New() getSegment := func() string { return fmt.Sprintf("%s/%016d", segmentPath, partNumber) @@ -358,18 +366,13 @@ func (d *driver) WriteStream(ctx context.Context, path string, offset int64, rea return 0, err } - if createManifest { - if err := d.createManifest(path, d.Container+"/"+segmentPath); err != nil { - return 0, err - } - } - // First, we skip the existing segments that are not modified by this call for i := range segments { if offset < cursor+segments[i].Bytes { break } cursor += segments[i].Bytes + hash.Write([]byte(segments[i].Hash)) partNumber++ } @@ -378,7 +381,7 @@ func (d *driver) WriteStream(ctx context.Context, path string, offset int64, rea if offset >= currentLength { for offset-currentLength >= chunkSize { // Insert a block a zero - _, err := d.Conn.ObjectPut(d.Container, getSegment(), bytes.NewReader(zeroBuf), false, "", d.getContentType(), nil) + headers, err := d.Conn.ObjectPut(d.Container, getSegment(), bytes.NewReader(zeroBuf), false, "", d.getContentType(), nil) if err != nil { if err == swift.ObjectNotFound { return 0, storagedriver.PathNotFoundError{Path: getSegment()} @@ -387,6 +390,7 @@ func (d *driver) WriteStream(ctx context.Context, path string, offset int64, rea } currentLength += chunkSize partNumber++ + hash.Write([]byte(headers["Etag"])) } cursor = currentLength @@ -421,13 +425,23 @@ func (d *driver) WriteStream(ctx context.Context, path string, offset int64, rea return false, bytesRead, err } - n, err := io.Copy(currentSegment, multi) + segmentHash := md5.New() + writer := io.MultiWriter(currentSegment, segmentHash) + + n, err := io.Copy(writer, multi) if err != nil { return false, bytesRead, err } if n > 0 { - defer currentSegment.Close() + defer func() { + closeError := currentSegment.Close() + if err != nil { + err = closeError + } + hexHash := hex.EncodeToString(segmentHash.Sum(nil)) + hash.Write([]byte(hexHash)) + }() bytesRead += n - max(0, offset-cursor) } @@ -445,7 +459,7 @@ func (d *driver) WriteStream(ctx context.Context, path string, offset int64, rea return false, bytesRead, err } - _, copyErr := io.Copy(currentSegment, file) + _, copyErr := io.Copy(writer, file) if err := file.Close(); err != nil { if err == swift.ObjectNotFound { @@ -480,7 +494,35 @@ func (d *driver) WriteStream(ctx context.Context, path string, offset int64, rea } } - return bytesRead, nil + for ; partNumber < len(segments); partNumber++ { + hash.Write([]byte(segments[partNumber].Hash)) + } + + if createManifest { + if err := d.createManifest(path, d.Container+"/"+segmentPath); err != nil { + return 0, err + } + } + + expectedHash := hex.EncodeToString(hash.Sum(nil)) + waitingTime := readAfterWriteWait + endTime := time.Now().Add(readAfterWriteTimeout) + for { + var infos swift.Object + if infos, _, err = d.Conn.Object(d.Container, d.swiftPath(path)); err == nil { + if strings.Trim(infos.Hash, "\"") == expectedHash { + return bytesRead, nil + } + err = fmt.Errorf("Timeout expired while waiting for segments of %s to show up", path) + } + if time.Now().Add(waitingTime).After(endTime) { + break + } + time.Sleep(waitingTime) + waitingTime *= 2 + } + + return bytesRead, err } // Stat retrieves the FileInfo for the given path, including the current size