From 5a74b806f092ad2877caebee42e19bf495230ed1 Mon Sep 17 00:00:00 2001 From: mlmhl <409107750@qq.com> Date: Mon, 4 Dec 2017 11:31:10 +0800 Subject: [PATCH] update github.com/ncw/swift package in vendor to avoid potential memory leaks Signed-off-by: mlmhl <409107750@qq.com> --- vendor.conf | 2 +- vendor/github.com/ncw/swift/README.md | 2 + vendor/github.com/ncw/swift/auth_v3.go | 3 +- vendor/github.com/ncw/swift/dlo.go | 136 +++++ vendor/github.com/ncw/swift/largeobjects.go | 448 +++++++++++++++++ vendor/github.com/ncw/swift/slo.go | 168 +++++++ vendor/github.com/ncw/swift/swift.go | 469 ++++++++++++++---- .../github.com/ncw/swift/swifttest/server.go | 274 ++++++++-- vendor/github.com/ncw/swift/timeout_reader.go | 4 +- .../github.com/ncw/swift/watchdog_reader.go | 43 +- 10 files changed, 1387 insertions(+), 162 deletions(-) create mode 100644 vendor/github.com/ncw/swift/dlo.go create mode 100644 vendor/github.com/ncw/swift/largeobjects.go create mode 100644 vendor/github.com/ncw/swift/slo.go diff --git a/vendor.conf b/vendor.conf index 31bb03422..bcb2d1f0c 100644 --- a/vendor.conf +++ b/vendor.conf @@ -23,11 +23,11 @@ github.com/satori/go.uuid f58768cc1a7a7e77a3bd49e98cdd21419399b6a3 github.com/matttproud/golang_protobuf_extensions c12348ce28de40eed0136aa2b644d0ee0650e56c github.com/miekg/dns 271c58e0c14f552178ea321a545ff9af38930f39 github.com/mitchellh/mapstructure 482a9fd5fa83e8c4e7817413b80f3eb8feec03ef -github.com/ncw/swift b964f2ca856aac39885e258ad25aec08d5f64ee6 github.com/prometheus/client_golang c332b6f63c0658a65eca15c0e5247ded801cf564 github.com/prometheus/client_model 99fa1f4be8e564e8a6b613da7fa6f46c9edafc6c github.com/prometheus/common 89604d197083d4781071d3c65855d24ecfb0a563 github.com/prometheus/procfs cb4147076ac75738c9a7d279075a253c0cc5acbd +github.com/ncw/swift c95c6e5c2d1a3d37fc44c8c6dc9e231c7500667d github.com/spf13/cobra 312092086bed4968099259622145a0c9ae280064 github.com/spf13/pflag 5644820622454e71517561946e3d94b9f9db6842 github.com/stevvooe/resumable 2aaf90b2ceea5072cb503ef2a620b08ff3119870 diff --git a/vendor/github.com/ncw/swift/README.md b/vendor/github.com/ncw/swift/README.md index 2cc24cffa..5094a4675 100644 --- a/vendor/github.com/ncw/swift/README.md +++ b/vendor/github.com/ncw/swift/README.md @@ -138,3 +138,5 @@ Contributors - Cezar Sa Espinola - Sam Gunaratne - Richard Scothern +- Michel Couillard +- Christopher Waldon diff --git a/vendor/github.com/ncw/swift/auth_v3.go b/vendor/github.com/ncw/swift/auth_v3.go index 21e96718b..64dcabc9b 100644 --- a/vendor/github.com/ncw/swift/auth_v3.go +++ b/vendor/github.com/ncw/swift/auth_v3.go @@ -117,7 +117,7 @@ func (auth *v3Auth) Request(c *Connection) (*http.Request, error) { v3 := v3AuthRequest{} - if c.UserName == "" { + if c.UserName == "" && c.UserId == "" { v3.Auth.Identity.Methods = []string{v3AuthMethodToken} v3.Auth.Identity.Token = &v3AuthToken{Id: c.ApiKey} } else { @@ -125,6 +125,7 @@ func (auth *v3Auth) Request(c *Connection) (*http.Request, error) { v3.Auth.Identity.Password = &v3AuthPassword{ User: v3User{ Name: c.UserName, + Id: c.UserId, Password: c.ApiKey, }, } diff --git a/vendor/github.com/ncw/swift/dlo.go b/vendor/github.com/ncw/swift/dlo.go new file mode 100644 index 000000000..e2e2aa97e --- /dev/null +++ b/vendor/github.com/ncw/swift/dlo.go @@ -0,0 +1,136 @@ +package swift + +import ( + "os" +) + +// DynamicLargeObjectCreateFile represents an open static large object +type DynamicLargeObjectCreateFile struct { + largeObjectCreateFile +} + +// DynamicLargeObjectCreateFile creates a dynamic large object +// returning an object which satisfies io.Writer, io.Seeker, io.Closer +// and io.ReaderFrom. The flags are as passes to the +// largeObjectCreate method. +func (c *Connection) DynamicLargeObjectCreateFile(opts *LargeObjectOpts) (LargeObjectFile, error) { + lo, err := c.largeObjectCreate(opts) + if err != nil { + return nil, err + } + + return withBuffer(opts, &DynamicLargeObjectCreateFile{ + largeObjectCreateFile: *lo, + }), nil +} + +// DynamicLargeObjectCreate creates or truncates an existing dynamic +// large object returning a writeable object. This sets opts.Flags to +// an appropriate value before calling DynamicLargeObjectCreateFile +func (c *Connection) DynamicLargeObjectCreate(opts *LargeObjectOpts) (LargeObjectFile, error) { + opts.Flags = os.O_TRUNC | os.O_CREATE + return c.DynamicLargeObjectCreateFile(opts) +} + +// DynamicLargeObjectDelete deletes a dynamic large object and all of its segments. +func (c *Connection) DynamicLargeObjectDelete(container string, path string) error { + return c.LargeObjectDelete(container, path) +} + +// DynamicLargeObjectMove moves a dynamic large object from srcContainer, srcObjectName to dstContainer, dstObjectName +func (c *Connection) DynamicLargeObjectMove(srcContainer string, srcObjectName string, dstContainer string, dstObjectName string) error { + info, headers, err := c.Object(dstContainer, srcObjectName) + if err != nil { + return err + } + + segmentContainer, segmentPath := parseFullPath(headers["X-Object-Manifest"]) + if err := c.createDLOManifest(dstContainer, dstObjectName, segmentContainer+"/"+segmentPath, info.ContentType); err != nil { + return err + } + + if err := c.ObjectDelete(srcContainer, srcObjectName); err != nil { + return err + } + + return nil +} + +// createDLOManifest creates a dynamic large object manifest +func (c *Connection) createDLOManifest(container string, objectName string, prefix string, contentType string) error { + headers := make(Headers) + headers["X-Object-Manifest"] = prefix + manifest, err := c.ObjectCreate(container, objectName, false, "", contentType, headers) + if err != nil { + return err + } + + if err := manifest.Close(); err != nil { + return err + } + + return nil +} + +// Close satisfies the io.Closer interface +func (file *DynamicLargeObjectCreateFile) Close() error { + return file.Flush() +} + +func (file *DynamicLargeObjectCreateFile) Flush() error { + err := file.conn.createDLOManifest(file.container, file.objectName, file.segmentContainer+"/"+file.prefix, file.contentType) + if err != nil { + return err + } + return file.conn.waitForSegmentsToShowUp(file.container, file.objectName, file.Size()) +} + +func (c *Connection) getAllDLOSegments(segmentContainer, segmentPath string) ([]Object, error) { + //a simple container listing works 99.9% of the time + segments, err := c.ObjectsAll(segmentContainer, &ObjectsOpts{Prefix: segmentPath}) + if err != nil { + return nil, err + } + + hasObjectName := make(map[string]struct{}) + for _, segment := range segments { + hasObjectName[segment.Name] = struct{}{} + } + + //The container listing might be outdated (i.e. not contain all existing + //segment objects yet) because of temporary inconsistency (Swift is only + //eventually consistent!). Check its completeness. + segmentNumber := 0 + for { + segmentNumber++ + segmentName := getSegment(segmentPath, segmentNumber) + if _, seen := hasObjectName[segmentName]; seen { + continue + } + + //This segment is missing in the container listing. Use a more reliable + //request to check its existence. (HEAD requests on segments are + //guaranteed to return the correct metadata, except for the pathological + //case of an outage of large parts of the Swift cluster or its network, + //since every segment is only written once.) + segment, _, err := c.Object(segmentContainer, segmentName) + switch err { + case nil: + //found new segment -> add it in the correct position and keep + //going, more might be missing + if segmentNumber <= len(segments) { + segments = append(segments[:segmentNumber], segments[segmentNumber-1:]...) + segments[segmentNumber-1] = segment + } else { + segments = append(segments, segment) + } + continue + case ObjectNotFound: + //This segment is missing. Since we upload segments sequentially, + //there won't be any more segments after it. + return segments, nil + default: + return nil, err //unexpected error + } + } +} diff --git a/vendor/github.com/ncw/swift/largeobjects.go b/vendor/github.com/ncw/swift/largeobjects.go new file mode 100644 index 000000000..bec640b00 --- /dev/null +++ b/vendor/github.com/ncw/swift/largeobjects.go @@ -0,0 +1,448 @@ +package swift + +import ( + "bufio" + "bytes" + "crypto/rand" + "crypto/sha1" + "encoding/hex" + "errors" + "fmt" + "io" + "os" + gopath "path" + "strconv" + "strings" + "time" +) + +// NotLargeObject is returned if an operation is performed on an object which isn't large. +var NotLargeObject = errors.New("Not a large object") + +// readAfterWriteTimeout defines the time we wait before an object appears after having been uploaded +var readAfterWriteTimeout = 15 * time.Second + +// readAfterWriteWait defines the time to sleep between two retries +var readAfterWriteWait = 200 * time.Millisecond + +// largeObjectCreateFile represents an open static or dynamic large object +type largeObjectCreateFile struct { + conn *Connection + container string + objectName string + currentLength int64 + filePos int64 + chunkSize int64 + segmentContainer string + prefix string + contentType string + checkHash bool + segments []Object + headers Headers + minChunkSize int64 +} + +func swiftSegmentPath(path string) (string, error) { + checksum := sha1.New() + random := make([]byte, 32) + if _, err := rand.Read(random); err != nil { + return "", err + } + path = hex.EncodeToString(checksum.Sum(append([]byte(path), random...))) + return strings.TrimLeft(strings.TrimRight("segments/"+path[0:3]+"/"+path[3:], "/"), "/"), nil +} + +func getSegment(segmentPath string, partNumber int) string { + return fmt.Sprintf("%s/%016d", segmentPath, partNumber) +} + +func parseFullPath(manifest string) (container string, prefix string) { + components := strings.SplitN(manifest, "/", 2) + container = components[0] + if len(components) > 1 { + prefix = components[1] + } + return container, prefix +} + +func (headers Headers) IsLargeObjectDLO() bool { + _, isDLO := headers["X-Object-Manifest"] + return isDLO +} + +func (headers Headers) IsLargeObjectSLO() bool { + _, isSLO := headers["X-Static-Large-Object"] + return isSLO +} + +func (headers Headers) IsLargeObject() bool { + return headers.IsLargeObjectSLO() || headers.IsLargeObjectDLO() +} + +func (c *Connection) getAllSegments(container string, path string, headers Headers) (string, []Object, error) { + if manifest, isDLO := headers["X-Object-Manifest"]; isDLO { + segmentContainer, segmentPath := parseFullPath(manifest) + segments, err := c.getAllDLOSegments(segmentContainer, segmentPath) + return segmentContainer, segments, err + } + if headers.IsLargeObjectSLO() { + return c.getAllSLOSegments(container, path) + } + return "", nil, NotLargeObject +} + +// LargeObjectOpts describes how a large object should be created +type LargeObjectOpts struct { + Container string // Name of container to place object + ObjectName string // Name of object + Flags int // Creation flags + CheckHash bool // If set Check the hash + Hash string // If set use this hash to check + ContentType string // Content-Type of the object + Headers Headers // Additional headers to upload the object with + ChunkSize int64 // Size of chunks of the object, defaults to 10MB if not set + MinChunkSize int64 // Minimum chunk size, automatically set for SLO's based on info + SegmentContainer string // Name of the container to place segments + SegmentPrefix string // Prefix to use for the segments + NoBuffer bool // Prevents using a bufio.Writer to write segments +} + +type LargeObjectFile interface { + io.Writer + io.Seeker + io.Closer + Size() int64 + Flush() error +} + +// largeObjectCreate creates a large object at opts.Container, opts.ObjectName. +// +// opts.Flags can have the following bits set +// os.TRUNC - remove the contents of the large object if it exists +// os.APPEND - write at the end of the large object +func (c *Connection) largeObjectCreate(opts *LargeObjectOpts) (*largeObjectCreateFile, error) { + var ( + segmentPath string + segmentContainer string + segments []Object + currentLength int64 + err error + ) + + if opts.SegmentPrefix != "" { + segmentPath = opts.SegmentPrefix + } else if segmentPath, err = swiftSegmentPath(opts.ObjectName); err != nil { + return nil, err + } + + if info, headers, err := c.Object(opts.Container, opts.ObjectName); err == nil { + if opts.Flags&os.O_TRUNC != 0 { + c.LargeObjectDelete(opts.Container, opts.ObjectName) + } else { + currentLength = info.Bytes + if headers.IsLargeObject() { + segmentContainer, segments, err = c.getAllSegments(opts.Container, opts.ObjectName, headers) + if err != nil { + return nil, err + } + if len(segments) > 0 { + segmentPath = gopath.Dir(segments[0].Name) + } + } else { + if err = c.ObjectMove(opts.Container, opts.ObjectName, opts.Container, getSegment(segmentPath, 1)); err != nil { + return nil, err + } + segments = append(segments, info) + } + } + } else if err != ObjectNotFound { + return nil, err + } + + // segmentContainer is not empty when the manifest already existed + if segmentContainer == "" { + if opts.SegmentContainer != "" { + segmentContainer = opts.SegmentContainer + } else { + segmentContainer = opts.Container + "_segments" + } + } + + file := &largeObjectCreateFile{ + conn: c, + checkHash: opts.CheckHash, + container: opts.Container, + objectName: opts.ObjectName, + chunkSize: opts.ChunkSize, + minChunkSize: opts.MinChunkSize, + headers: opts.Headers, + segmentContainer: segmentContainer, + prefix: segmentPath, + segments: segments, + currentLength: currentLength, + } + + if file.chunkSize == 0 { + file.chunkSize = 10 * 1024 * 1024 + } + + if file.minChunkSize > file.chunkSize { + file.chunkSize = file.minChunkSize + } + + if opts.Flags&os.O_APPEND != 0 { + file.filePos = currentLength + } + + return file, nil +} + +// LargeObjectDelete deletes the large object named by container, path +func (c *Connection) LargeObjectDelete(container string, objectName string) error { + _, headers, err := c.Object(container, objectName) + if err != nil { + return err + } + + var objects [][]string + if headers.IsLargeObject() { + segmentContainer, segments, err := c.getAllSegments(container, objectName, headers) + if err != nil { + return err + } + for _, obj := range segments { + objects = append(objects, []string{segmentContainer, obj.Name}) + } + } + objects = append(objects, []string{container, objectName}) + + info, err := c.cachedQueryInfo() + if err == nil && info.SupportsBulkDelete() && len(objects) > 0 { + filenames := make([]string, len(objects)) + for i, obj := range objects { + filenames[i] = obj[0] + "/" + obj[1] + } + _, err = c.doBulkDelete(filenames) + // Don't fail on ObjectNotFound because eventual consistency + // makes this situation normal. + if err != nil && err != Forbidden && err != ObjectNotFound { + return err + } + } else { + for _, obj := range objects { + if err := c.ObjectDelete(obj[0], obj[1]); err != nil { + return err + } + } + } + + return nil +} + +// LargeObjectGetSegments returns all the segments that compose an object +// If the object is a Dynamic Large Object (DLO), it just returns the objects +// that have the prefix as indicated by the manifest. +// If the object is a Static Large Object (SLO), it retrieves the JSON content +// of the manifest and return all the segments of it. +func (c *Connection) LargeObjectGetSegments(container string, path string) (string, []Object, error) { + _, headers, err := c.Object(container, path) + if err != nil { + return "", nil, err + } + + return c.getAllSegments(container, path, headers) +} + +// Seek sets the offset for the next write operation +func (file *largeObjectCreateFile) Seek(offset int64, whence int) (int64, error) { + switch whence { + case 0: + file.filePos = offset + case 1: + file.filePos += offset + case 2: + file.filePos = file.currentLength + offset + default: + return -1, fmt.Errorf("invalid value for whence") + } + if file.filePos < 0 { + return -1, fmt.Errorf("negative offset") + } + return file.filePos, nil +} + +func (file *largeObjectCreateFile) Size() int64 { + return file.currentLength +} + +func withLORetry(expectedSize int64, fn func() (Headers, int64, error)) (err error) { + endTimer := time.NewTimer(readAfterWriteTimeout) + defer endTimer.Stop() + waitingTime := readAfterWriteWait + for { + var headers Headers + var sz int64 + if headers, sz, err = fn(); err == nil { + if !headers.IsLargeObjectDLO() || (expectedSize == 0 && sz > 0) || expectedSize == sz { + return + } + } else { + return + } + waitTimer := time.NewTimer(waitingTime) + select { + case <-endTimer.C: + waitTimer.Stop() + err = fmt.Errorf("Timeout expired while waiting for object to have size == %d, got: %d", expectedSize, sz) + return + case <-waitTimer.C: + waitingTime *= 2 + } + } +} + +func (c *Connection) waitForSegmentsToShowUp(container, objectName string, expectedSize int64) (err error) { + err = withLORetry(expectedSize, func() (Headers, int64, error) { + var info Object + var headers Headers + info, headers, err = c.objectBase(container, objectName) + if err != nil { + return headers, 0, err + } + return headers, info.Bytes, nil + }) + return +} + +// Write satisfies the io.Writer interface +func (file *largeObjectCreateFile) Write(buf []byte) (int, error) { + var sz int64 + var relativeFilePos int + writeSegmentIdx := 0 + for i, obj := range file.segments { + if file.filePos < sz+obj.Bytes || (i == len(file.segments)-1 && file.filePos < sz+file.minChunkSize) { + relativeFilePos = int(file.filePos - sz) + break + } + writeSegmentIdx++ + sz += obj.Bytes + } + sizeToWrite := len(buf) + for offset := 0; offset < sizeToWrite; { + newSegment, n, err := file.writeSegment(buf[offset:], writeSegmentIdx, relativeFilePos) + if err != nil { + return 0, err + } + if writeSegmentIdx < len(file.segments) { + file.segments[writeSegmentIdx] = *newSegment + } else { + file.segments = append(file.segments, *newSegment) + } + offset += n + writeSegmentIdx++ + relativeFilePos = 0 + } + file.filePos += int64(sizeToWrite) + file.currentLength = 0 + for _, obj := range file.segments { + file.currentLength += obj.Bytes + } + return sizeToWrite, nil +} + +func (file *largeObjectCreateFile) writeSegment(buf []byte, writeSegmentIdx int, relativeFilePos int) (*Object, int, error) { + var ( + readers []io.Reader + existingSegment *Object + segmentSize int + ) + segmentName := getSegment(file.prefix, writeSegmentIdx+1) + sizeToRead := int(file.chunkSize) + if writeSegmentIdx < len(file.segments) { + existingSegment = &file.segments[writeSegmentIdx] + if writeSegmentIdx != len(file.segments)-1 { + sizeToRead = int(existingSegment.Bytes) + } + if relativeFilePos > 0 { + headers := make(Headers) + headers["Range"] = "bytes=0-" + strconv.FormatInt(int64(relativeFilePos-1), 10) + existingSegmentReader, _, err := file.conn.ObjectOpen(file.segmentContainer, segmentName, true, headers) + if err != nil { + return nil, 0, err + } + defer existingSegmentReader.Close() + sizeToRead -= relativeFilePos + segmentSize += relativeFilePos + readers = []io.Reader{existingSegmentReader} + } + } + if sizeToRead > len(buf) { + sizeToRead = len(buf) + } + segmentSize += sizeToRead + readers = append(readers, bytes.NewReader(buf[:sizeToRead])) + if existingSegment != nil && segmentSize < int(existingSegment.Bytes) { + headers := make(Headers) + headers["Range"] = "bytes=" + strconv.FormatInt(int64(segmentSize), 10) + "-" + tailSegmentReader, _, err := file.conn.ObjectOpen(file.segmentContainer, segmentName, true, headers) + if err != nil { + return nil, 0, err + } + defer tailSegmentReader.Close() + segmentSize = int(existingSegment.Bytes) + readers = append(readers, tailSegmentReader) + } + segmentReader := io.MultiReader(readers...) + headers, err := file.conn.ObjectPut(file.segmentContainer, segmentName, segmentReader, true, "", file.contentType, nil) + if err != nil { + return nil, 0, err + } + return &Object{Name: segmentName, Bytes: int64(segmentSize), Hash: headers["Etag"]}, sizeToRead, nil +} + +func withBuffer(opts *LargeObjectOpts, lo LargeObjectFile) LargeObjectFile { + if !opts.NoBuffer { + return &bufferedLargeObjectFile{ + LargeObjectFile: lo, + bw: bufio.NewWriterSize(lo, int(opts.ChunkSize)), + } + } + return lo +} + +type bufferedLargeObjectFile struct { + LargeObjectFile + bw *bufio.Writer +} + +func (blo *bufferedLargeObjectFile) Close() error { + err := blo.bw.Flush() + if err != nil { + return err + } + return blo.LargeObjectFile.Close() +} + +func (blo *bufferedLargeObjectFile) Write(p []byte) (n int, err error) { + return blo.bw.Write(p) +} + +func (blo *bufferedLargeObjectFile) Seek(offset int64, whence int) (int64, error) { + err := blo.bw.Flush() + if err != nil { + return 0, err + } + return blo.LargeObjectFile.Seek(offset, whence) +} + +func (blo *bufferedLargeObjectFile) Size() int64 { + return blo.LargeObjectFile.Size() + int64(blo.bw.Buffered()) +} + +func (blo *bufferedLargeObjectFile) Flush() error { + err := blo.bw.Flush() + if err != nil { + return err + } + return blo.LargeObjectFile.Flush() +} diff --git a/vendor/github.com/ncw/swift/slo.go b/vendor/github.com/ncw/swift/slo.go new file mode 100644 index 000000000..4d94aa764 --- /dev/null +++ b/vendor/github.com/ncw/swift/slo.go @@ -0,0 +1,168 @@ +package swift + +import ( + "bytes" + "encoding/json" + "errors" + "fmt" + "io/ioutil" + "net/url" + "os" +) + +// StaticLargeObjectCreateFile represents an open static large object +type StaticLargeObjectCreateFile struct { + largeObjectCreateFile +} + +var SLONotSupported = errors.New("SLO not supported") + +type swiftSegment struct { + Path string `json:"path,omitempty"` + Etag string `json:"etag,omitempty"` + Size int64 `json:"size_bytes,omitempty"` + // When uploading a manifest, the attributes must be named `path`, `etag` and `size_bytes` + // but when querying the JSON content of a manifest with the `multipart-manifest=get` + // parameter, Swift names those attributes `name`, `hash` and `bytes`. + // We use all the different attributes names in this structure to be able to use + // the same structure for both uploading and retrieving. + Name string `json:"name,omitempty"` + Hash string `json:"hash,omitempty"` + Bytes int64 `json:"bytes,omitempty"` + ContentType string `json:"content_type,omitempty"` + LastModified string `json:"last_modified,omitempty"` +} + +// StaticLargeObjectCreateFile creates a static large object returning +// an object which satisfies io.Writer, io.Seeker, io.Closer and +// io.ReaderFrom. The flags are as passed to the largeObjectCreate +// method. +func (c *Connection) StaticLargeObjectCreateFile(opts *LargeObjectOpts) (LargeObjectFile, error) { + info, err := c.cachedQueryInfo() + if err != nil || !info.SupportsSLO() { + return nil, SLONotSupported + } + realMinChunkSize := info.SLOMinSegmentSize() + if realMinChunkSize > opts.MinChunkSize { + opts.MinChunkSize = realMinChunkSize + } + lo, err := c.largeObjectCreate(opts) + if err != nil { + return nil, err + } + return withBuffer(opts, &StaticLargeObjectCreateFile{ + largeObjectCreateFile: *lo, + }), nil +} + +// StaticLargeObjectCreate creates or truncates an existing static +// large object returning a writeable object. This sets opts.Flags to +// an appropriate value before calling StaticLargeObjectCreateFile +func (c *Connection) StaticLargeObjectCreate(opts *LargeObjectOpts) (LargeObjectFile, error) { + opts.Flags = os.O_TRUNC | os.O_CREATE + return c.StaticLargeObjectCreateFile(opts) +} + +// StaticLargeObjectDelete deletes a static large object and all of its segments. +func (c *Connection) StaticLargeObjectDelete(container string, path string) error { + info, err := c.cachedQueryInfo() + if err != nil || !info.SupportsSLO() { + return SLONotSupported + } + return c.LargeObjectDelete(container, path) +} + +// StaticLargeObjectMove moves a static large object from srcContainer, srcObjectName to dstContainer, dstObjectName +func (c *Connection) StaticLargeObjectMove(srcContainer string, srcObjectName string, dstContainer string, dstObjectName string) error { + swiftInfo, err := c.cachedQueryInfo() + if err != nil || !swiftInfo.SupportsSLO() { + return SLONotSupported + } + info, headers, err := c.Object(srcContainer, srcObjectName) + if err != nil { + return err + } + + container, segments, err := c.getAllSegments(srcContainer, srcObjectName, headers) + if err != nil { + return err + } + + if err := c.createSLOManifest(dstContainer, dstObjectName, info.ContentType, container, segments); err != nil { + return err + } + + if err := c.ObjectDelete(srcContainer, srcObjectName); err != nil { + return err + } + + return nil +} + +// createSLOManifest creates a static large object manifest +func (c *Connection) createSLOManifest(container string, path string, contentType string, segmentContainer string, segments []Object) error { + sloSegments := make([]swiftSegment, len(segments)) + for i, segment := range segments { + sloSegments[i].Path = fmt.Sprintf("%s/%s", segmentContainer, segment.Name) + sloSegments[i].Etag = segment.Hash + sloSegments[i].Size = segment.Bytes + } + + content, err := json.Marshal(sloSegments) + if err != nil { + return err + } + + values := url.Values{} + values.Set("multipart-manifest", "put") + if _, err := c.objectPut(container, path, bytes.NewBuffer(content), false, "", contentType, nil, values); err != nil { + return err + } + + return nil +} + +func (file *StaticLargeObjectCreateFile) Close() error { + return file.Flush() +} + +func (file *StaticLargeObjectCreateFile) Flush() error { + if err := file.conn.createSLOManifest(file.container, file.objectName, file.contentType, file.segmentContainer, file.segments); err != nil { + return err + } + return file.conn.waitForSegmentsToShowUp(file.container, file.objectName, file.Size()) +} + +func (c *Connection) getAllSLOSegments(container, path string) (string, []Object, error) { + var ( + segmentList []swiftSegment + segments []Object + segPath string + segmentContainer string + ) + + values := url.Values{} + values.Set("multipart-manifest", "get") + + file, _, err := c.objectOpen(container, path, true, nil, values) + if err != nil { + return "", nil, err + } + + content, err := ioutil.ReadAll(file) + if err != nil { + return "", nil, err + } + + json.Unmarshal(content, &segmentList) + for _, segment := range segmentList { + segmentContainer, segPath = parseFullPath(segment.Name[1:]) + segments = append(segments, Object{ + Name: segPath, + Bytes: segment.Bytes, + Hash: segment.Hash, + }) + } + + return segmentContainer, segments, nil +} diff --git a/vendor/github.com/ncw/swift/swift.go b/vendor/github.com/ncw/swift/swift.go index 4ba276b18..38e696532 100644 --- a/vendor/github.com/ncw/swift/swift.go +++ b/vendor/github.com/ncw/swift/swift.go @@ -11,9 +11,11 @@ import ( "fmt" "hash" "io" + "io/ioutil" "mime" "net/http" "net/url" + "os" "path" "strconv" "strings" @@ -33,6 +35,17 @@ const ( allObjectsChanLimit = 1000 // ...when fetching to a channel ) +// ObjectType is the type of the swift object, regular, static large, +// or dynamic large. +type ObjectType int + +// Values that ObjectType can take +const ( + RegularObjectType ObjectType = iota + StaticLargeObjectType + DynamicLargeObjectType +) + // Connection holds the details of the connection to the swift server. // // You need to provide UserName, ApiKey and AuthUrl when you create a @@ -86,6 +99,7 @@ type Connection struct { Domain string // User's domain name DomainId string // User's domain Id UserName string // UserName for api + UserId string // User Id ApiKey string // Key for api access AuthUrl string // Auth URL Retries int // Retries on error (default is 3) @@ -108,6 +122,139 @@ type Connection struct { client *http.Client Auth Authenticator `json:"-" xml:"-"` // the current authenticator authLock sync.Mutex // lock when R/W StorageUrl, AuthToken, Auth + // swiftInfo is filled after QueryInfo is called + swiftInfo SwiftInfo +} + +// setFromEnv reads the value that param points to (it must be a +// pointer), if it isn't the zero value then it reads the environment +// variable name passed in, parses it according to the type and writes +// it to the pointer. +func setFromEnv(param interface{}, name string) (err error) { + val := os.Getenv(name) + if val == "" { + return + } + switch result := param.(type) { + case *string: + if *result == "" { + *result = val + } + case *int: + if *result == 0 { + *result, err = strconv.Atoi(val) + } + case *bool: + if *result == false { + *result, err = strconv.ParseBool(val) + } + case *time.Duration: + if *result == 0 { + *result, err = time.ParseDuration(val) + } + case *EndpointType: + if *result == EndpointType("") { + *result = EndpointType(val) + } + default: + return newErrorf(0, "can't set var of type %T", param) + } + return err +} + +// ApplyEnvironment reads environment variables and applies them to +// the Connection structure. It won't overwrite any parameters which +// are already set in the Connection struct. +// +// To make a new Connection object entirely from the environment you +// would do: +// +// c := new(Connection) +// err := c.ApplyEnvironment() +// if err != nil { log.Fatal(err) } +// +// The naming of these variables follows the official Openstack naming +// scheme so it should be compatible with OpenStack rc files. +// +// For v1 authentication (obsolete) +// ST_AUTH - Auth URL +// ST_USER - UserName for api +// ST_KEY - Key for api access +// +// For v2 authentication +// OS_AUTH_URL - Auth URL +// OS_USERNAME - UserName for api +// OS_PASSWORD - Key for api access +// OS_TENANT_NAME - Name of the tenant +// OS_TENANT_ID - Id of the tenant +// OS_REGION_NAME - Region to use - default is use first region +// +// For v3 authentication +// OS_AUTH_URL - Auth URL +// OS_USERNAME - UserName for api +// OS_USER_ID - User Id +// OS_PASSWORD - Key for api access +// OS_USER_DOMAIN_NAME - User's domain name +// OS_USER_DOMAIN_ID - User's domain Id +// OS_PROJECT_NAME - Name of the project +// OS_PROJECT_DOMAIN_NAME - Name of the tenant's domain, only needed if it differs from the user domain +// OS_PROJECT_DOMAIN_ID - Id of the tenant's domain, only needed if it differs the from user domain +// OS_TRUST_ID - If of the trust +// OS_REGION_NAME - Region to use - default is use first region +// +// Other +// OS_ENDPOINT_TYPE - Endpoint type public, internal or admin +// ST_AUTH_VERSION - Choose auth version - 1, 2 or 3 or leave at 0 for autodetect +// +// For manual authentication +// OS_STORAGE_URL - storage URL from alternate authentication +// OS_AUTH_TOKEN - Auth Token from alternate authentication +// +// Library specific +// GOSWIFT_RETRIES - Retries on error (default is 3) +// GOSWIFT_USER_AGENT - HTTP User agent (default goswift/1.0) +// GOSWIFT_CONNECT_TIMEOUT - Connect channel timeout with unit, eg "10s", "100ms" (default "10s") +// GOSWIFT_TIMEOUT - Data channel timeout with unit, eg "10s", "100ms" (default "60s") +// GOSWIFT_INTERNAL - Set this to "true" to use the the internal network (obsolete - use OS_ENDPOINT_TYPE) +func (c *Connection) ApplyEnvironment() (err error) { + for _, item := range []struct { + result interface{} + name string + }{ + // Environment variables - keep in same order as Connection + {&c.Domain, "OS_USER_DOMAIN_NAME"}, + {&c.DomainId, "OS_USER_DOMAIN_ID"}, + {&c.UserName, "OS_USERNAME"}, + {&c.UserId, "OS_USER_ID"}, + {&c.ApiKey, "OS_PASSWORD"}, + {&c.AuthUrl, "OS_AUTH_URL"}, + {&c.Retries, "GOSWIFT_RETRIES"}, + {&c.UserAgent, "GOSWIFT_USER_AGENT"}, + {&c.ConnectTimeout, "GOSWIFT_CONNECT_TIMEOUT"}, + {&c.Timeout, "GOSWIFT_TIMEOUT"}, + {&c.Region, "OS_REGION_NAME"}, + {&c.AuthVersion, "ST_AUTH_VERSION"}, + {&c.Internal, "GOSWIFT_INTERNAL"}, + {&c.Tenant, "OS_TENANT_NAME"}, //v2 + {&c.Tenant, "OS_PROJECT_NAME"}, // v3 + {&c.TenantId, "OS_TENANT_ID"}, + {&c.EndpointType, "OS_ENDPOINT_TYPE"}, + {&c.TenantDomain, "OS_PROJECT_DOMAIN_NAME"}, + {&c.TenantDomainId, "OS_PROJECT_DOMAIN_ID"}, + {&c.TrustId, "OS_TRUST_ID"}, + {&c.StorageUrl, "OS_STORAGE_URL"}, + {&c.AuthToken, "OS_AUTH_TOKEN"}, + // v1 auth alternatives + {&c.ApiKey, "ST_KEY"}, + {&c.UserName, "ST_USER"}, + {&c.AuthUrl, "ST_AUTH"}, + } { + err = setFromEnv(item.result, item.name) + if err != nil { + return newErrorf(0, "failed to read env var %q: %v", item.name, err) + } + } + return nil } // Error - all errors generated by this package are of this type. Other error @@ -140,6 +287,7 @@ type errorMap map[int]error var ( // Specific Errors you might want to check for equality + NotModified = newError(304, "Not Modified") BadRequest = newError(400, "Bad Request") AuthorizationFailed = newError(401, "Authorization Failed") ContainerNotFound = newError(404, "Container Not Found") @@ -167,6 +315,7 @@ var ( // Mappings for object errors objectErrorMap = errorMap{ + 304: NotModified, 400: BadRequest, 403: Forbidden, 404: ObjectNotFound, @@ -184,15 +333,32 @@ func checkClose(c io.Closer, err *error) { } } +// drainAndClose discards all data from rd and closes it. +// If an error occurs during Read, it is discarded. +func drainAndClose(rd io.ReadCloser, err *error) { + if rd == nil { + return + } + + _, _ = io.Copy(ioutil.Discard, rd) + cerr := rd.Close() + if err != nil && *err == nil { + *err = cerr + } +} + // parseHeaders checks a response for errors and translates into -// standard errors if necessary. +// standard errors if necessary. If an error is returned, resp.Body +// has been drained and closed. func (c *Connection) parseHeaders(resp *http.Response, errorMap errorMap) error { if errorMap != nil { if err, ok := errorMap[resp.StatusCode]; ok { + drainAndClose(resp.Body, nil) return err } } if resp.StatusCode < 200 || resp.StatusCode > 299 { + drainAndClose(resp.Body, nil) return newErrorf(resp.StatusCode, "HTTP Error: %d: %s", resp.StatusCode, resp.Status) } return nil @@ -305,13 +471,14 @@ again: } if req != nil { timer := time.NewTimer(c.ConnectTimeout) + defer timer.Stop() var resp *http.Response resp, err = c.doTimeoutRequest(timer, req) if err != nil { return } defer func() { - checkClose(resp.Body, &err) + drainAndClose(resp.Body, &err) // Flush the auth connection - we don't want to keep // it open if keepalives were enabled flushKeepaliveConnections(c.Transport) @@ -406,6 +573,24 @@ func (c *Connection) authenticated() bool { // the enabled middlewares and their configuration type SwiftInfo map[string]interface{} +func (i SwiftInfo) SupportsBulkDelete() bool { + _, val := i["bulk_delete"] + return val +} + +func (i SwiftInfo) SupportsSLO() bool { + _, val := i["slo"] + return val +} + +func (i SwiftInfo) SLOMinSegmentSize() int64 { + if slo, ok := i["slo"].(map[string]interface{}); ok { + val, _ := slo["min_segment_size"].(float64) + return int64(val) + } + return 1 +} + // Discover Swift configuration by doing a request against /info func (c *Connection) QueryInfo() (infos SwiftInfo, err error) { infoUrl, err := url.Parse(c.StorageUrl) @@ -413,14 +598,36 @@ func (c *Connection) QueryInfo() (infos SwiftInfo, err error) { return nil, err } infoUrl.Path = path.Join(infoUrl.Path, "..", "..", "info") - resp, err := http.Get(infoUrl.String()) + resp, err := c.client.Get(infoUrl.String()) if err == nil { + if resp.StatusCode != http.StatusOK { + drainAndClose(resp.Body, nil) + return nil, fmt.Errorf("Invalid status code for info request: %d", resp.StatusCode) + } err = readJson(resp, &infos) + if err == nil { + c.authLock.Lock() + c.swiftInfo = infos + c.authLock.Unlock() + } return infos, err } return nil, err } +func (c *Connection) cachedQueryInfo() (infos SwiftInfo, err error) { + c.authLock.Lock() + infos = c.swiftInfo + c.authLock.Unlock() + if infos == nil { + infos, err = c.QueryInfo() + if err != nil { + return + } + } + return infos, nil +} + // RequestOpts contains parameters for Connection.storage. type RequestOpts struct { Container string @@ -444,6 +651,7 @@ type RequestOpts struct { // Any other parameters (if not None) are added to the targetUrl // // Returns a response or an error. If response is returned then +// the resp.Body must be read completely and // resp.Body.Close() must be called on it, unless noResponse is set in // which case the body will be closed in this function // @@ -484,6 +692,7 @@ func (c *Connection) Call(targetUrl string, p RequestOpts) (resp *http.Response, URL.RawQuery = p.Parameters.Encode() } timer := time.NewTimer(c.ConnectTimeout) + defer timer.Stop() reader := p.Body if reader != nil { reader = newWatchdogReader(reader, c.Timeout, timer) @@ -518,7 +727,7 @@ func (c *Connection) Call(targetUrl string, p RequestOpts) (resp *http.Response, } // Check to see if token has expired if resp.StatusCode == 401 && retries > 0 { - _ = resp.Body.Close() + drainAndClose(resp.Body, nil) c.UnAuthenticate() retries-- } else { @@ -527,12 +736,12 @@ func (c *Connection) Call(targetUrl string, p RequestOpts) (resp *http.Response, } if err = c.parseHeaders(resp, p.ErrorMap); err != nil { - _ = resp.Body.Close() return nil, nil, err } headers = readHeaders(resp) if p.NoResponse { - err = resp.Body.Close() + var err error + drainAndClose(resp.Body, &err) if err != nil { return nil, nil, err } @@ -574,7 +783,7 @@ func (c *Connection) storage(p RequestOpts) (resp *http.Response, headers Header // // Closes the response when done func readLines(resp *http.Response) (lines []string, err error) { - defer checkClose(resp.Body, &err) + defer drainAndClose(resp.Body, &err) reader := bufio.NewReader(resp.Body) buffer := bytes.NewBuffer(make([]byte, 0, 128)) var part []byte @@ -599,7 +808,7 @@ func readLines(resp *http.Response) (lines []string, err error) { // // Closes the response when done func readJson(resp *http.Response, result interface{}) (err error) { - defer checkClose(resp.Body, &err) + defer drainAndClose(resp.Body, &err) decoder := json.NewDecoder(resp.Body) return decoder.Decode(result) } @@ -796,14 +1005,15 @@ func (c *Connection) ObjectNames(container string, opts *ObjectsOpts) ([]string, // Object contains information about an object type Object struct { - Name string `json:"name"` // object name - ContentType string `json:"content_type"` // eg application/directory - Bytes int64 `json:"bytes"` // size in bytes - ServerLastModified string `json:"last_modified"` // Last modified time, eg '2011-06-30T08:20:47.736680' as a string supplied by the server - LastModified time.Time // Last modified time converted to a time.Time - Hash string `json:"hash"` // MD5 hash, eg "d41d8cd98f00b204e9800998ecf8427e" - PseudoDirectory bool // Set when using delimiter to show that this directory object does not really exist - SubDir string `json:"subdir"` // returned only when using delimiter to mark "pseudo directories" + Name string `json:"name"` // object name + ContentType string `json:"content_type"` // eg application/directory + Bytes int64 `json:"bytes"` // size in bytes + ServerLastModified string `json:"last_modified"` // Last modified time, eg '2011-06-30T08:20:47.736680' as a string supplied by the server + LastModified time.Time // Last modified time converted to a time.Time + Hash string `json:"hash"` // MD5 hash, eg "d41d8cd98f00b204e9800998ecf8427e" + PseudoDirectory bool // Set when using delimiter to show that this directory object does not really exist + SubDir string `json:"subdir"` // returned only when using delimiter to mark "pseudo directories" + ObjectType ObjectType // type of this object } // Objects returns a slice of Object with information about each @@ -1141,6 +1351,19 @@ func (file *ObjectCreateFile) Close() error { return nil } +// Headers returns the response headers from the created object if the upload +// has been completed. The Close() method must be called on an ObjectCreateFile +// before this method. +func (file *ObjectCreateFile) Headers() (Headers, error) { + // error out if upload is not complete. + select { + case <-file.done: + default: + return nil, fmt.Errorf("Cannot get metadata, object upload failed or has not yet completed.") + } + return file.headers, nil +} + // Check it satisfies the interface var _ io.WriteCloser = &ObjectCreateFile{} @@ -1202,7 +1425,7 @@ func (c *Connection) ObjectCreate(container string, objectName string, checkHash } // Run the PUT in the background piping it data go func() { - file.resp, file.headers, file.err = c.storage(RequestOpts{ + opts := RequestOpts{ Container: container, ObjectName: objectName, Operation: "PUT", @@ -1210,7 +1433,8 @@ func (c *Connection) ObjectCreate(container string, objectName string, checkHash Body: pipeReader, NoResponse: true, ErrorMap: objectErrorMap, - }) + } + file.resp, file.headers, file.err = c.storage(opts) // Signal finished pipeReader.Close() close(file.done) @@ -1218,6 +1442,37 @@ func (c *Connection) ObjectCreate(container string, objectName string, checkHash return } +func (c *Connection) objectPut(container string, objectName string, contents io.Reader, checkHash bool, Hash string, contentType string, h Headers, parameters url.Values) (headers Headers, err error) { + extraHeaders := objectPutHeaders(objectName, &checkHash, Hash, contentType, h) + hash := md5.New() + var body io.Reader = contents + if checkHash { + body = io.TeeReader(contents, hash) + } + _, headers, err = c.storage(RequestOpts{ + Container: container, + ObjectName: objectName, + Operation: "PUT", + Headers: extraHeaders, + Body: body, + NoResponse: true, + ErrorMap: objectErrorMap, + Parameters: parameters, + }) + if err != nil { + return + } + if checkHash { + receivedMd5 := strings.ToLower(headers["Etag"]) + calculatedMd5 := fmt.Sprintf("%x", hash.Sum(nil)) + if receivedMd5 != calculatedMd5 { + err = ObjectCorrupted + return + } + } + return +} + // ObjectPut creates or updates the path in the container from // contents. contents should be an open io.Reader which will have all // its contents read. @@ -1240,33 +1495,7 @@ func (c *Connection) ObjectCreate(container string, objectName string, checkHash // If contentType is set it will be used, otherwise one will be // guessed from objectName using mime.TypeByExtension func (c *Connection) ObjectPut(container string, objectName string, contents io.Reader, checkHash bool, Hash string, contentType string, h Headers) (headers Headers, err error) { - extraHeaders := objectPutHeaders(objectName, &checkHash, Hash, contentType, h) - hash := md5.New() - var body io.Reader = contents - if checkHash { - body = io.TeeReader(contents, hash) - } - _, headers, err = c.storage(RequestOpts{ - Container: container, - ObjectName: objectName, - Operation: "PUT", - Headers: extraHeaders, - Body: body, - NoResponse: true, - ErrorMap: objectErrorMap, - }) - if err != nil { - return - } - if checkHash { - receivedMd5 := strings.ToLower(headers["Etag"]) - calculatedMd5 := fmt.Sprintf("%x", hash.Sum(nil)) - if receivedMd5 != calculatedMd5 { - err = ObjectCorrupted - return - } - } - return + return c.objectPut(container, objectName, contents, checkHash, Hash, contentType, h, nil) } // ObjectPutBytes creates an object from a []byte in a container. @@ -1274,7 +1503,8 @@ func (c *Connection) ObjectPut(container string, objectName string, contents io. // This is a simplified interface which checks the MD5. func (c *Connection) ObjectPutBytes(container string, objectName string, contents []byte, contentType string) (err error) { buf := bytes.NewBuffer(contents) - _, err = c.ObjectPut(container, objectName, buf, true, "", contentType, nil) + h := Headers{"Content-Length": strconv.Itoa(len(contents))} + _, err = c.ObjectPut(container, objectName, buf, true, "", contentType, h) return } @@ -1283,7 +1513,8 @@ func (c *Connection) ObjectPutBytes(container string, objectName string, content // This is a simplified interface which checks the MD5 func (c *Connection) ObjectPutString(container string, objectName string, contents string, contentType string) (err error) { buf := strings.NewReader(contents) - _, err = c.ObjectPut(container, objectName, buf, true, "", contentType, nil) + h := Headers{"Content-Length": strconv.Itoa(len(contents))} + _, err = c.ObjectPut(container, objectName, buf, true, "", contentType, h) return } @@ -1303,10 +1534,14 @@ type ObjectOpenFile struct { lengthOk bool // whether length is valid length int64 // length of the object if read seeked bool // whether we have seeked this file or not + overSeeked bool // set if we have seeked to the end or beyond } // Read bytes from the object - see io.Reader func (file *ObjectOpenFile) Read(p []byte) (n int, err error) { + if file.overSeeked { + return 0, io.EOF + } n, err = file.body.Read(p) file.bytes += int64(n) file.pos += int64(n) @@ -1330,6 +1565,7 @@ func (file *ObjectOpenFile) Read(p []byte) (n int, err error) { // // Seek(0, 1) will return the current file pointer. func (file *ObjectOpenFile) Seek(offset int64, whence int) (newPos int64, err error) { + file.overSeeked = false switch whence { case 0: // relative to start newPos = offset @@ -1340,6 +1576,10 @@ func (file *ObjectOpenFile) Seek(offset int64, whence int) (newPos int64, err er return file.pos, newError(0, "Length of file unknown so can't seek from end") } newPos = file.length + offset + if offset >= 0 { + file.overSeeked = true + return + } default: panic("Unknown whence in ObjectOpenFile.Seek") } @@ -1419,6 +1659,57 @@ func (file *ObjectOpenFile) Close() (err error) { var _ io.ReadCloser = &ObjectOpenFile{} var _ io.Seeker = &ObjectOpenFile{} +func (c *Connection) objectOpenBase(container string, objectName string, checkHash bool, h Headers, parameters url.Values) (file *ObjectOpenFile, headers Headers, err error) { + var resp *http.Response + opts := RequestOpts{ + Container: container, + ObjectName: objectName, + Operation: "GET", + ErrorMap: objectErrorMap, + Headers: h, + Parameters: parameters, + } + resp, headers, err = c.storage(opts) + if err != nil { + return + } + // Can't check MD5 on an object with X-Object-Manifest or X-Static-Large-Object set + if checkHash && headers.IsLargeObject() { + // log.Printf("swift: turning off md5 checking on object with manifest %v", objectName) + checkHash = false + } + file = &ObjectOpenFile{ + connection: c, + container: container, + objectName: objectName, + headers: h, + resp: resp, + checkHash: checkHash, + body: resp.Body, + } + if checkHash { + file.hash = md5.New() + file.body = io.TeeReader(resp.Body, file.hash) + } + // Read Content-Length + if resp.Header.Get("Content-Length") != "" { + file.length, err = getInt64FromHeader(resp, "Content-Length") + file.lengthOk = (err == nil) + } + return +} + +func (c *Connection) objectOpen(container string, objectName string, checkHash bool, h Headers, parameters url.Values) (file *ObjectOpenFile, headers Headers, err error) { + err = withLORetry(0, func() (Headers, int64, error) { + file, headers, err = c.objectOpenBase(container, objectName, checkHash, h, parameters) + if err != nil { + return headers, 0, err + } + return headers, file.length, nil + }) + return +} + // ObjectOpen returns an ObjectOpenFile for reading the contents of // the object. This satisfies the io.ReadCloser and the io.Seeker // interfaces. @@ -1443,41 +1734,7 @@ var _ io.Seeker = &ObjectOpenFile{} // // headers["Content-Type"] will give the content type if desired. func (c *Connection) ObjectOpen(container string, objectName string, checkHash bool, h Headers) (file *ObjectOpenFile, headers Headers, err error) { - var resp *http.Response - resp, headers, err = c.storage(RequestOpts{ - Container: container, - ObjectName: objectName, - Operation: "GET", - ErrorMap: objectErrorMap, - Headers: h, - }) - if err != nil { - return - } - // Can't check MD5 on an object with X-Object-Manifest or X-Static-Large-Object set - if checkHash && (headers["X-Object-Manifest"] != "" || headers["X-Static-Large-Object"] != "") { - // log.Printf("swift: turning off md5 checking on object with manifest %v", objectName) - checkHash = false - } - file = &ObjectOpenFile{ - connection: c, - container: container, - objectName: objectName, - headers: h, - resp: resp, - checkHash: checkHash, - body: resp.Body, - } - if checkHash { - file.hash = md5.New() - file.body = io.TeeReader(resp.Body, file.hash) - } - // Read Content-Length - if resp.Header.Get("Content-Length") != "" { - file.length, err = getInt64FromHeader(resp, "Content-Length") - file.lengthOk = (err == nil) - } - return + return c.objectOpen(container, objectName, checkHash, h, nil) } // ObjectGet gets the object into the io.Writer contents. @@ -1580,19 +1837,10 @@ type BulkDeleteResult struct { Headers Headers // Response HTTP headers. } -// BulkDelete deletes multiple objectNames from container in one operation. -// -// Some servers may not accept bulk-delete requests since bulk-delete is -// an optional feature of swift - these will return the Forbidden error. -// -// See also: -// * http://docs.openstack.org/trunk/openstack-object-storage/admin/content/object-storage-bulk-delete.html -// * http://docs.rackspace.com/files/api/v1/cf-devguide/content/Bulk_Delete-d1e2338.html -func (c *Connection) BulkDelete(container string, objectNames []string) (result BulkDeleteResult, err error) { +func (c *Connection) doBulkDelete(objects []string) (result BulkDeleteResult, err error) { var buffer bytes.Buffer - for _, s := range objectNames { - buffer.WriteString(fmt.Sprintf("/%s/%s\n", container, - url.QueryEscape(s))) + for _, s := range objects { + buffer.WriteString(url.QueryEscape(s) + "\n") } resp, headers, err := c.storage(RequestOpts{ Operation: "DELETE", @@ -1633,6 +1881,22 @@ func (c *Connection) BulkDelete(container string, objectNames []string) (result return } +// BulkDelete deletes multiple objectNames from container in one operation. +// +// Some servers may not accept bulk-delete requests since bulk-delete is +// an optional feature of swift - these will return the Forbidden error. +// +// See also: +// * http://docs.openstack.org/trunk/openstack-object-storage/admin/content/object-storage-bulk-delete.html +// * http://docs.rackspace.com/files/api/v1/cf-devguide/content/Bulk_Delete-d1e2338.html +func (c *Connection) BulkDelete(container string, objectNames []string) (result BulkDeleteResult, err error) { + fullPaths := make([]string, len(objectNames)) + for i, name := range objectNames { + fullPaths[i] = fmt.Sprintf("/%s/%s", container, name) + } + return c.doBulkDelete(fullPaths) +} + // BulkUploadResult stores results of BulkUpload(). // // Individual errors may (or may not) be returned by Errors. @@ -1716,6 +1980,17 @@ func (c *Connection) BulkUpload(uploadPath string, dataStream io.Reader, format // // Use headers.ObjectMetadata() to read the metadata in the Headers. func (c *Connection) Object(container string, objectName string) (info Object, headers Headers, err error) { + err = withLORetry(0, func() (Headers, int64, error) { + info, headers, err = c.objectBase(container, objectName) + if err != nil { + return headers, 0, err + } + return headers, info.Bytes, nil + }) + return +} + +func (c *Connection) objectBase(container string, objectName string) (info Object, headers Headers, err error) { var resp *http.Response resp, headers, err = c.storage(RequestOpts{ Container: container, @@ -1756,6 +2031,12 @@ func (c *Connection) Object(container string, objectName string) (info Object, h } info.Hash = resp.Header.Get("Etag") + if resp.Header.Get("X-Object-Manifest") != "" { + info.ObjectType = DynamicLargeObjectType + } else if resp.Header.Get("X-Static-Large-Object") != "" { + info.ObjectType = StaticLargeObjectType + } + return } diff --git a/vendor/github.com/ncw/swift/swifttest/server.go b/vendor/github.com/ncw/swift/swifttest/server.go index a49abb9a0..6ec50f609 100644 --- a/vendor/github.com/ncw/swift/swifttest/server.go +++ b/vendor/github.com/ncw/swift/swifttest/server.go @@ -21,6 +21,7 @@ import ( "mime" "net" "net/http" + "net/http/httptest" "net/url" "path" "regexp" @@ -28,6 +29,7 @@ import ( "strconv" "strings" "sync" + "sync/atomic" "testing" "time" @@ -39,21 +41,28 @@ const ( TEST_ACCOUNT = "swifttest" ) +type HandlerOverrideFunc func(w http.ResponseWriter, r *http.Request, recorder *httptest.ResponseRecorder) + type SwiftServer struct { + // `sync/atomic` expects the first word in an allocated struct to be 64-bit + // aligned on both ARM and x86-32. + // See https://golang.org/pkg/sync/atomic/#pkg-note-BUG for more details. + reqId int64 + sync.RWMutex t *testing.T - reqId int mu sync.Mutex Listener net.Listener AuthURL string URL string Accounts map[string]*account Sessions map[string]*session + override map[string]HandlerOverrideFunc } // The Folder type represents a container stored in an account type Folder struct { - Count int `json:"count"` - Bytes int `json:"bytes"` + Count int64 `json:"count"` + Bytes int64 `json:"bytes"` Name string `json:"name"` } @@ -96,13 +105,16 @@ type metadata struct { } type account struct { + sync.RWMutex swift.Account metadata - password string - Containers map[string]*container + password string + ContainersLock sync.RWMutex + Containers map[string]*container } type object struct { + sync.RWMutex metadata name string mtime time.Time @@ -112,11 +124,31 @@ type object struct { } type container struct { + // `sync/atomic` expects the first word in an allocated struct to be 64-bit + // aligned on both ARM and x86-32. + // See https://golang.org/pkg/sync/atomic/#pkg-note-BUG for more details. + bytes int64 + sync.RWMutex metadata name string ctime time.Time objects map[string]*object - bytes int +} + +type segment struct { + Path string `json:"path,omitempty"` + Hash string `json:"hash,omitempty"` + Size int64 `json:"size_bytes,omitempty"` + // When uploading a manifest, the attributes must be named `path`, `hash` and `size` + // but when querying the JSON content of a manifest with the `multipart-manifest=get` + // parameter, Swift names those attributes `name`, `etag` and `bytes`. + // We use all the different attributes names in this structure to be able to use + // the same structure for both uploading and retrieving. + Name string `json:"name,omitempty"` + Etag string `json:"etag,omitempty"` + Bytes int64 `json:"bytes,omitempty"` + ContentType string `json:"content_type,omitempty"` + LastModified string `json:"last_modified,omitempty"` } // A resource encapsulates the subject of an HTTP request. @@ -179,9 +211,12 @@ func (m metadata) getMetadata(a *action) { } } -func (c container) list(delimiter string, marker string, prefix string, parent string) (resp []interface{}) { +func (c *container) list(delimiter string, marker string, prefix string, parent string) (resp []interface{}) { var tmp orderedObjects + c.RLock() + defer c.RUnlock() + // first get all matching objects and arrange them in alphabetical order. for _, obj := range c.objects { if strings.HasPrefix(obj.name, prefix) { @@ -236,19 +271,23 @@ func (r containerResource) get(a *action) interface{} { fatalf(404, "NoSuchContainer", "The specified container does not exist") } + r.container.RLock() + delimiter := a.req.Form.Get("delimiter") marker := a.req.Form.Get("marker") prefix := a.req.Form.Get("prefix") format := a.req.URL.Query().Get("format") parent := a.req.Form.Get("path") - a.w.Header().Set("X-Container-Bytes-Used", strconv.Itoa(r.container.bytes)) + a.w.Header().Set("X-Container-Bytes-Used", strconv.Itoa(int(r.container.bytes))) a.w.Header().Set("X-Container-Object-Count", strconv.Itoa(len(r.container.objects))) r.container.getMetadata(a) if a.req.Method == "HEAD" { + r.container.RUnlock() return nil } + r.container.RUnlock() objects := r.container.list(delimiter, marker, prefix, parent) @@ -297,8 +336,10 @@ func (r containerResource) delete(a *action) interface{} { if len(b.objects) > 0 { fatalf(409, "Conflict", "The container you tried to delete is not empty") } + a.user.Lock() delete(a.user.Containers, b.name) a.user.Account.Containers-- + a.user.Unlock() return nil } @@ -319,8 +360,11 @@ func (r containerResource) put(a *action) interface{} { }, } r.container.setMetadata(a, "container") + + a.user.Lock() a.user.Containers[r.name] = r.container a.user.Account.Containers++ + a.user.Unlock() } return nil @@ -330,10 +374,13 @@ func (r containerResource) post(a *action) interface{} { if r.container == nil { fatalf(400, "Method", "The resource could not be found.") } else { + r.container.RLock() + defer r.container.RUnlock() + r.container.setMetadata(a, "container") a.w.WriteHeader(201) jsonMarshal(a.w, Folder{ - Count: len(r.container.objects), + Count: int64(len(r.container.objects)), Bytes: r.container.bytes, Name: r.container.name, }) @@ -388,10 +435,11 @@ func (obj *object) Key() Key { } var metaHeaders = map[string]bool{ - "Content-Type": true, - "Content-Encoding": true, - "Content-Disposition": true, - "X-Object-Manifest": true, + "Content-Type": true, + "Content-Encoding": true, + "Content-Disposition": true, + "X-Object-Manifest": true, + "X-Static-Large-Object": true, } var rangeRegexp = regexp.MustCompile("(bytes=)?([0-9]*)-([0-9]*)") @@ -409,6 +457,9 @@ func (objr objectResource) get(a *action) interface{} { fatalf(404, "Not Found", "The resource could not be found.") } + obj.RLock() + defer obj.RUnlock() + h := a.w.Header() // add metadata obj.getMetadata(a) @@ -433,7 +484,9 @@ func (objr objectResource) get(a *action) interface{} { if manifest, ok := obj.meta["X-Object-Manifest"]; ok { var segments []io.Reader components := strings.SplitN(manifest[0], "/", 2) + a.user.RLock() segContainer := a.user.Containers[components[0]] + a.user.RUnlock() prefix := components[1] resp := segContainer.list("", "", prefix, "") sum := md5.New() @@ -453,19 +506,54 @@ func (objr objectResource) get(a *action) interface{} { } etag = sum.Sum(nil) if end == -1 { - end = size + end = size - 1 } - reader = io.LimitReader(io.MultiReader(segments...), int64(end-start)) + reader = io.LimitReader(io.MultiReader(segments...), int64(end-start+1)) + } else if value, ok := obj.meta["X-Static-Large-Object"]; ok && value[0] == "True" && a.req.URL.Query().Get("multipart-manifest") != "get" { + var segments []io.Reader + var segmentList []segment + json.Unmarshal(obj.data, &segmentList) + cursor := 0 + size := 0 + sum := md5.New() + for _, segment := range segmentList { + components := strings.SplitN(segment.Name[1:], "/", 2) + a.user.RLock() + segContainer := a.user.Containers[components[0]] + a.user.RUnlock() + objectName := components[1] + segObject := segContainer.objects[objectName] + length := len(segObject.data) + size += length + sum.Write([]byte(hex.EncodeToString(segObject.checksum))) + if start >= cursor+length { + continue + } + segments = append(segments, bytes.NewReader(segObject.data[max(0, start-cursor):])) + cursor += length + } + etag = sum.Sum(nil) + if end == -1 { + end = size - 1 + } + reader = io.LimitReader(io.MultiReader(segments...), int64(end-start+1)) } else { if end == -1 { - end = len(obj.data) + end = len(obj.data) - 1 } etag = obj.checksum - reader = bytes.NewReader(obj.data[start:end]) + reader = bytes.NewReader(obj.data[start : end+1]) } - h.Set("Content-Length", fmt.Sprint(end-start)) - h.Set("ETag", hex.EncodeToString(etag)) + etagHex := hex.EncodeToString(etag) + + if a.req.Header.Get("If-None-Match") == etagHex { + a.w.WriteHeader(http.StatusNotModified) + return nil + } + + h.Set("Content-Length", fmt.Sprint(end-start+1)) + h.Set("ETag", etagHex) h.Set("Last-Modified", obj.mtime.Format(http.TimeFormat)) if a.req.Method == "HEAD" { @@ -514,10 +602,10 @@ func (objr objectResource) put(a *action) interface{} { meta: make(http.Header), }, } - a.user.Objects++ + atomic.AddInt64(&a.user.Objects, 1) } else { - objr.container.bytes -= len(obj.data) - a.user.BytesUsed -= int64(len(obj.data)) + atomic.AddInt64(&objr.container.bytes, -int64(len(obj.data))) + atomic.AddInt64(&a.user.BytesUsed, -int64(len(obj.data))) } var content_type string @@ -528,15 +616,39 @@ func (objr objectResource) put(a *action) interface{} { } } + if a.req.URL.Query().Get("multipart-manifest") == "put" { + // TODO: check the content of the SLO + a.req.Header.Set("X-Static-Large-Object", "True") + + var segments []segment + json.Unmarshal(data, &segments) + for i := range segments { + segments[i].Name = "/" + segments[i].Path + segments[i].Path = "" + segments[i].Hash = segments[i].Etag + segments[i].Etag = "" + segments[i].Bytes = segments[i].Size + segments[i].Size = 0 + } + + data, _ = json.Marshal(segments) + sum = md5.New() + sum.Write(data) + gotHash = sum.Sum(nil) + } + // PUT request has been successful - save data and metadata obj.setMetadata(a, "object") obj.content_type = content_type obj.data = data obj.checksum = gotHash obj.mtime = time.Now().UTC() + objr.container.Lock() objr.container.objects[objr.name] = obj - objr.container.bytes += len(data) - a.user.BytesUsed += int64(len(data)) + objr.container.bytes += int64(len(data)) + objr.container.Unlock() + + atomic.AddInt64(&a.user.BytesUsed, int64(len(data))) h := a.w.Header() h.Set("ETag", hex.EncodeToString(obj.checksum)) @@ -549,14 +661,25 @@ func (objr objectResource) delete(a *action) interface{} { fatalf(404, "NoSuchKey", "The specified key does not exist.") } - objr.container.bytes -= len(objr.object.data) - a.user.BytesUsed -= int64(len(objr.object.data)) + objr.container.Lock() + defer objr.container.Unlock() + + objr.object.Lock() + defer objr.object.Unlock() + + objr.container.bytes -= int64(len(objr.object.data)) delete(objr.container.objects, objr.name) - a.user.Objects-- + + atomic.AddInt64(&a.user.BytesUsed, -int64(len(objr.object.data))) + atomic.AddInt64(&a.user.Objects, -1) + return nil } func (objr objectResource) post(a *action) interface{} { + objr.object.Lock() + defer objr.object.Unlock() + obj := objr.object obj.setMetadata(a, "object") return nil @@ -568,6 +691,9 @@ func (objr objectResource) copy(a *action) interface{} { } obj := objr.object + obj.RLock() + defer obj.RUnlock() + destination := a.req.Header.Get("Destination") if destination == "" { fatalf(400, "Bad Request", "You must provide a Destination header") @@ -590,29 +716,38 @@ func (objr objectResource) copy(a *action) interface{} { meta: make(http.Header), }, } - a.user.Objects++ + atomic.AddInt64(&a.user.Objects, 1) } else { obj2 = objr2.object - objr2.container.bytes -= len(obj2.data) - a.user.BytesUsed -= int64(len(obj2.data)) + atomic.AddInt64(&objr2.container.bytes, -int64(len(obj2.data))) + atomic.AddInt64(&a.user.BytesUsed, -int64(len(obj2.data))) } default: fatalf(400, "Bad Request", "Destination must point to a valid object path") } + if objr2.container.name != objr2.container.name && obj2.name != obj.name { + obj2.Lock() + defer obj2.Unlock() + } + obj2.content_type = obj.content_type obj2.data = obj.data obj2.checksum = obj.checksum obj2.mtime = time.Now() - objr2.container.objects[objr2.name] = obj2 - objr2.container.bytes += len(obj.data) - a.user.BytesUsed += int64(len(obj.data)) for key, values := range obj.metadata.meta { obj2.metadata.meta[key] = values } obj2.setMetadata(a, "object") + objr2.container.Lock() + objr2.container.objects[objr2.name] = obj2 + objr2.container.bytes += int64(len(obj.data)) + objr2.container.Unlock() + + atomic.AddInt64(&a.user.BytesUsed, int64(len(obj.data))) + return nil } @@ -620,8 +755,14 @@ func (s *SwiftServer) serveHTTP(w http.ResponseWriter, req *http.Request) { // ignore error from ParseForm as it's usually spurious. req.ParseForm() - s.mu.Lock() - defer s.mu.Unlock() + if fn := s.override[req.URL.Path]; fn != nil { + originalRW := w + recorder := httptest.NewRecorder() + w = recorder + defer func() { + fn(originalRW, req, recorder) + }() + } if DEBUG { log.Printf("swifttest %q %q", req.Method, req.URL) @@ -630,9 +771,9 @@ func (s *SwiftServer) serveHTTP(w http.ResponseWriter, req *http.Request) { srv: s, w: w, req: req, - reqId: fmt.Sprintf("%09X", s.reqId), + reqId: fmt.Sprintf("%09X", atomic.LoadInt64(&s.reqId)), } - s.reqId++ + atomic.AddInt64(&s.reqId, 1) var r resource defer func() { @@ -651,6 +792,8 @@ func (s *SwiftServer) serveHTTP(w http.ResponseWriter, req *http.Request) { if req.URL.String() == "/v1.0" { username := req.Header.Get("x-auth-user") key := req.Header.Get("x-auth-key") + s.Lock() + defer s.Unlock() if acct, ok := s.Accounts[username]; ok { if acct.password == key { r := make([]byte, 16) @@ -676,6 +819,11 @@ func (s *SwiftServer) serveHTTP(w http.ResponseWriter, req *http.Request) { "tempurl": map[string]interface{}{ "methods": []string{"GET", "HEAD", "PUT"}, }, + "slo": map[string]interface{}{ + "max_manifest_segments": 1000, + "max_manifest_size": 2097152, + "min_segment_size": 1, + }, }) return } @@ -688,9 +836,11 @@ func (s *SwiftServer) serveHTTP(w http.ResponseWriter, req *http.Request) { if key == "" && signature != "" && expires != "" { accountName, _, _, _ := s.parseURL(req.URL) secretKey := "" + s.RLock() if account, ok := s.Accounts[accountName]; ok { secretKey = account.meta.Get("X-Account-Meta-Temp-Url-Key") } + s.RUnlock() get_hmac := func(method string) string { mac := hmac.New(sha1.New, []byte(secretKey)) @@ -707,12 +857,16 @@ func (s *SwiftServer) serveHTTP(w http.ResponseWriter, req *http.Request) { panic(notAuthorized()) } } else { + s.RLock() session, ok := s.Sessions[key[7:]] if !ok { + s.RUnlock() panic(notAuthorized()) + return } a.user = s.Accounts[session.username] + s.RUnlock() } switch req.Method { @@ -746,6 +900,14 @@ func (s *SwiftServer) serveHTTP(w http.ResponseWriter, req *http.Request) { } } +func (s *SwiftServer) SetOverride(path string, fn HandlerOverrideFunc) { + s.override[path] = fn +} + +func (s *SwiftServer) UnsetOverride(path string) { + delete(s.override, path) +} + func jsonMarshal(w io.Writer, x interface{}) { if err := json.NewEncoder(w).Encode(x); err != nil { panic(fmt.Errorf("error marshalling %#v: %v", x, err)) @@ -773,14 +935,21 @@ func (srv *SwiftServer) resourceForURL(u *url.URL) (r resource) { fatalf(404, "InvalidURI", err.Error()) } + srv.RLock() account, ok := srv.Accounts[accountName] if !ok { + //srv.RUnlock() fatalf(404, "NoSuchAccount", "The specified account does not exist") } + srv.RUnlock() + account.RLock() if containerName == "" { + account.RUnlock() return rootResource{} } + account.RUnlock() + b := containerResource{ name: containerName, container: account.Containers[containerName], @@ -800,6 +969,8 @@ func (srv *SwiftServer) resourceForURL(u *url.URL) (r resource) { container: b.container, } + objr.container.RLock() + defer objr.container.RUnlock() if obj := objr.container.objects[objr.name]; obj != nil { objr.object = obj } @@ -835,9 +1006,12 @@ func (rootResource) get(a *action) interface{} { h := a.w.Header() - h.Set("X-Account-Bytes-Used", strconv.Itoa(int(a.user.BytesUsed))) - h.Set("X-Account-Container-Count", strconv.Itoa(int(a.user.Account.Containers))) - h.Set("X-Account-Object-Count", strconv.Itoa(int(a.user.Objects))) + h.Set("X-Account-Bytes-Used", strconv.Itoa(int(atomic.LoadInt64(&a.user.BytesUsed)))) + h.Set("X-Account-Container-Count", strconv.Itoa(int(atomic.LoadInt64(&a.user.Account.Containers)))) + h.Set("X-Account-Object-Count", strconv.Itoa(int(atomic.LoadInt64(&a.user.Objects)))) + + a.user.RLock() + defer a.user.RUnlock() // add metadata a.user.metadata.getMetadata(a) @@ -862,7 +1036,7 @@ func (rootResource) get(a *action) interface{} { } if format == "json" { resp = append(resp, Folder{ - Count: len(container.objects), + Count: int64(len(container.objects)), Bytes: container.bytes, Name: container.name, }) @@ -879,7 +1053,9 @@ func (rootResource) get(a *action) interface{} { } func (r rootResource) post(a *action) interface{} { + a.user.Lock() a.user.metadata.setMetadata(a, "account") + a.user.Unlock() return nil } @@ -894,21 +1070,10 @@ func (rootResource) delete(a *action) interface{} { func (rootResource) copy(a *action) interface{} { return notAllowed() } func NewSwiftServer(address string) (*SwiftServer, error) { - var ( - l net.Listener - err error - ) if strings.Index(address, ":") == -1 { - for port := 1024; port < 65535; port++ { - addr := fmt.Sprintf("%s:%d", address, port) - if l, err = net.Listen("tcp", addr); err == nil { - address = addr - break - } - } - } else { - l, err = net.Listen("tcp", address) + address += ":0" } + l, err := net.Listen("tcp", address) if err != nil { return nil, fmt.Errorf("cannot listen on %s: %v", address, err) } @@ -919,6 +1084,7 @@ func NewSwiftServer(address string) (*SwiftServer, error) { URL: "http://" + l.Addr().String() + "/v1", Accounts: make(map[string]*account), Sessions: make(map[string]*session), + override: make(map[string]HandlerOverrideFunc), } server.Accounts[TEST_ACCOUNT] = &account{ diff --git a/vendor/github.com/ncw/swift/timeout_reader.go b/vendor/github.com/ncw/swift/timeout_reader.go index 3839e9ea0..88ae73328 100644 --- a/vendor/github.com/ncw/swift/timeout_reader.go +++ b/vendor/github.com/ncw/swift/timeout_reader.go @@ -38,10 +38,12 @@ func (t *timeoutReader) Read(p []byte) (int, error) { done <- result{n, err} }() // Wait for the read or the timeout + timer := time.NewTimer(t.timeout) + defer timer.Stop() select { case r := <-done: return r.n, r.err - case <-time.After(t.timeout): + case <-timer.C: t.cancel() return 0, TimeoutError } diff --git a/vendor/github.com/ncw/swift/watchdog_reader.go b/vendor/github.com/ncw/swift/watchdog_reader.go index b12b1bbe2..2714c9e1a 100644 --- a/vendor/github.com/ncw/swift/watchdog_reader.go +++ b/vendor/github.com/ncw/swift/watchdog_reader.go @@ -5,29 +5,50 @@ import ( "time" ) +var watchdogChunkSize = 1 << 20 // 1 MiB + // An io.Reader which resets a watchdog timer whenever data is read type watchdogReader struct { - timeout time.Duration - reader io.Reader - timer *time.Timer + timeout time.Duration + reader io.Reader + timer *time.Timer + chunkSize int } // Returns a new reader which will kick the watchdog timer whenever data is read func newWatchdogReader(reader io.Reader, timeout time.Duration, timer *time.Timer) *watchdogReader { return &watchdogReader{ - timeout: timeout, - reader: reader, - timer: timer, + timeout: timeout, + reader: reader, + timer: timer, + chunkSize: watchdogChunkSize, } } // Read reads up to len(p) bytes into p -func (t *watchdogReader) Read(p []byte) (n int, err error) { - // FIXME limit the amount of data read in one chunk so as to not exceed the timeout? +func (t *watchdogReader) Read(p []byte) (int, error) { + //read from underlying reader in chunks not larger than t.chunkSize + //while resetting the watchdog timer before every read; the small chunk + //size ensures that the timer does not fire when reading a large amount of + //data from a slow connection + start := 0 + end := len(p) + for start < end { + length := end - start + if length > t.chunkSize { + length = t.chunkSize + } + + resetTimer(t.timer, t.timeout) + n, err := t.reader.Read(p[start : start+length]) + start += n + if n == 0 || err != nil { + return start, err + } + } + resetTimer(t.timer, t.timeout) - n, err = t.reader.Read(p) - resetTimer(t.timer, t.timeout) - return + return start, nil } // Check it satisfies the interface