forked from TrueCloudLab/distribution
update github.com/ncw/swift package in vendor to avoid potential memory leaks
Signed-off-by: mlmhl <409107750@qq.com>
This commit is contained in:
parent
9930542dc5
commit
5a74b806f0
10 changed files with 1387 additions and 162 deletions
|
@ -23,11 +23,11 @@ github.com/satori/go.uuid f58768cc1a7a7e77a3bd49e98cdd21419399b6a3
|
|||
github.com/matttproud/golang_protobuf_extensions c12348ce28de40eed0136aa2b644d0ee0650e56c
|
||||
github.com/miekg/dns 271c58e0c14f552178ea321a545ff9af38930f39
|
||||
github.com/mitchellh/mapstructure 482a9fd5fa83e8c4e7817413b80f3eb8feec03ef
|
||||
github.com/ncw/swift b964f2ca856aac39885e258ad25aec08d5f64ee6
|
||||
github.com/prometheus/client_golang c332b6f63c0658a65eca15c0e5247ded801cf564
|
||||
github.com/prometheus/client_model 99fa1f4be8e564e8a6b613da7fa6f46c9edafc6c
|
||||
github.com/prometheus/common 89604d197083d4781071d3c65855d24ecfb0a563
|
||||
github.com/prometheus/procfs cb4147076ac75738c9a7d279075a253c0cc5acbd
|
||||
github.com/ncw/swift c95c6e5c2d1a3d37fc44c8c6dc9e231c7500667d
|
||||
github.com/spf13/cobra 312092086bed4968099259622145a0c9ae280064
|
||||
github.com/spf13/pflag 5644820622454e71517561946e3d94b9f9db6842
|
||||
github.com/stevvooe/resumable 2aaf90b2ceea5072cb503ef2a620b08ff3119870
|
||||
|
|
2
vendor/github.com/ncw/swift/README.md
generated
vendored
2
vendor/github.com/ncw/swift/README.md
generated
vendored
|
@ -138,3 +138,5 @@ Contributors
|
|||
- Cezar Sa Espinola <cezarsa@gmail.com>
|
||||
- Sam Gunaratne <samgzeit@gmail.com>
|
||||
- Richard Scothern <richard.scothern@gmail.com>
|
||||
- Michel Couillard <couillard.michel@voxlog.ca>
|
||||
- Christopher Waldon <ckwaldon@us.ibm.com>
|
||||
|
|
3
vendor/github.com/ncw/swift/auth_v3.go
generated
vendored
3
vendor/github.com/ncw/swift/auth_v3.go
generated
vendored
|
@ -117,7 +117,7 @@ func (auth *v3Auth) Request(c *Connection) (*http.Request, error) {
|
|||
|
||||
v3 := v3AuthRequest{}
|
||||
|
||||
if c.UserName == "" {
|
||||
if c.UserName == "" && c.UserId == "" {
|
||||
v3.Auth.Identity.Methods = []string{v3AuthMethodToken}
|
||||
v3.Auth.Identity.Token = &v3AuthToken{Id: c.ApiKey}
|
||||
} else {
|
||||
|
@ -125,6 +125,7 @@ func (auth *v3Auth) Request(c *Connection) (*http.Request, error) {
|
|||
v3.Auth.Identity.Password = &v3AuthPassword{
|
||||
User: v3User{
|
||||
Name: c.UserName,
|
||||
Id: c.UserId,
|
||||
Password: c.ApiKey,
|
||||
},
|
||||
}
|
||||
|
|
136
vendor/github.com/ncw/swift/dlo.go
generated
vendored
Normal file
136
vendor/github.com/ncw/swift/dlo.go
generated
vendored
Normal file
|
@ -0,0 +1,136 @@
|
|||
package swift
|
||||
|
||||
import (
|
||||
"os"
|
||||
)
|
||||
|
||||
// DynamicLargeObjectCreateFile represents an open static large object
|
||||
type DynamicLargeObjectCreateFile struct {
|
||||
largeObjectCreateFile
|
||||
}
|
||||
|
||||
// DynamicLargeObjectCreateFile creates a dynamic large object
|
||||
// returning an object which satisfies io.Writer, io.Seeker, io.Closer
|
||||
// and io.ReaderFrom. The flags are as passes to the
|
||||
// largeObjectCreate method.
|
||||
func (c *Connection) DynamicLargeObjectCreateFile(opts *LargeObjectOpts) (LargeObjectFile, error) {
|
||||
lo, err := c.largeObjectCreate(opts)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return withBuffer(opts, &DynamicLargeObjectCreateFile{
|
||||
largeObjectCreateFile: *lo,
|
||||
}), nil
|
||||
}
|
||||
|
||||
// DynamicLargeObjectCreate creates or truncates an existing dynamic
|
||||
// large object returning a writeable object. This sets opts.Flags to
|
||||
// an appropriate value before calling DynamicLargeObjectCreateFile
|
||||
func (c *Connection) DynamicLargeObjectCreate(opts *LargeObjectOpts) (LargeObjectFile, error) {
|
||||
opts.Flags = os.O_TRUNC | os.O_CREATE
|
||||
return c.DynamicLargeObjectCreateFile(opts)
|
||||
}
|
||||
|
||||
// DynamicLargeObjectDelete deletes a dynamic large object and all of its segments.
|
||||
func (c *Connection) DynamicLargeObjectDelete(container string, path string) error {
|
||||
return c.LargeObjectDelete(container, path)
|
||||
}
|
||||
|
||||
// DynamicLargeObjectMove moves a dynamic large object from srcContainer, srcObjectName to dstContainer, dstObjectName
|
||||
func (c *Connection) DynamicLargeObjectMove(srcContainer string, srcObjectName string, dstContainer string, dstObjectName string) error {
|
||||
info, headers, err := c.Object(dstContainer, srcObjectName)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
segmentContainer, segmentPath := parseFullPath(headers["X-Object-Manifest"])
|
||||
if err := c.createDLOManifest(dstContainer, dstObjectName, segmentContainer+"/"+segmentPath, info.ContentType); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := c.ObjectDelete(srcContainer, srcObjectName); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// createDLOManifest creates a dynamic large object manifest
|
||||
func (c *Connection) createDLOManifest(container string, objectName string, prefix string, contentType string) error {
|
||||
headers := make(Headers)
|
||||
headers["X-Object-Manifest"] = prefix
|
||||
manifest, err := c.ObjectCreate(container, objectName, false, "", contentType, headers)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := manifest.Close(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Close satisfies the io.Closer interface
|
||||
func (file *DynamicLargeObjectCreateFile) Close() error {
|
||||
return file.Flush()
|
||||
}
|
||||
|
||||
func (file *DynamicLargeObjectCreateFile) Flush() error {
|
||||
err := file.conn.createDLOManifest(file.container, file.objectName, file.segmentContainer+"/"+file.prefix, file.contentType)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return file.conn.waitForSegmentsToShowUp(file.container, file.objectName, file.Size())
|
||||
}
|
||||
|
||||
func (c *Connection) getAllDLOSegments(segmentContainer, segmentPath string) ([]Object, error) {
|
||||
//a simple container listing works 99.9% of the time
|
||||
segments, err := c.ObjectsAll(segmentContainer, &ObjectsOpts{Prefix: segmentPath})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
hasObjectName := make(map[string]struct{})
|
||||
for _, segment := range segments {
|
||||
hasObjectName[segment.Name] = struct{}{}
|
||||
}
|
||||
|
||||
//The container listing might be outdated (i.e. not contain all existing
|
||||
//segment objects yet) because of temporary inconsistency (Swift is only
|
||||
//eventually consistent!). Check its completeness.
|
||||
segmentNumber := 0
|
||||
for {
|
||||
segmentNumber++
|
||||
segmentName := getSegment(segmentPath, segmentNumber)
|
||||
if _, seen := hasObjectName[segmentName]; seen {
|
||||
continue
|
||||
}
|
||||
|
||||
//This segment is missing in the container listing. Use a more reliable
|
||||
//request to check its existence. (HEAD requests on segments are
|
||||
//guaranteed to return the correct metadata, except for the pathological
|
||||
//case of an outage of large parts of the Swift cluster or its network,
|
||||
//since every segment is only written once.)
|
||||
segment, _, err := c.Object(segmentContainer, segmentName)
|
||||
switch err {
|
||||
case nil:
|
||||
//found new segment -> add it in the correct position and keep
|
||||
//going, more might be missing
|
||||
if segmentNumber <= len(segments) {
|
||||
segments = append(segments[:segmentNumber], segments[segmentNumber-1:]...)
|
||||
segments[segmentNumber-1] = segment
|
||||
} else {
|
||||
segments = append(segments, segment)
|
||||
}
|
||||
continue
|
||||
case ObjectNotFound:
|
||||
//This segment is missing. Since we upload segments sequentially,
|
||||
//there won't be any more segments after it.
|
||||
return segments, nil
|
||||
default:
|
||||
return nil, err //unexpected error
|
||||
}
|
||||
}
|
||||
}
|
448
vendor/github.com/ncw/swift/largeobjects.go
generated
vendored
Normal file
448
vendor/github.com/ncw/swift/largeobjects.go
generated
vendored
Normal file
|
@ -0,0 +1,448 @@
|
|||
package swift
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"bytes"
|
||||
"crypto/rand"
|
||||
"crypto/sha1"
|
||||
"encoding/hex"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"os"
|
||||
gopath "path"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
)
|
||||
|
||||
// NotLargeObject is returned if an operation is performed on an object which isn't large.
|
||||
var NotLargeObject = errors.New("Not a large object")
|
||||
|
||||
// readAfterWriteTimeout defines the time we wait before an object appears after having been uploaded
|
||||
var readAfterWriteTimeout = 15 * time.Second
|
||||
|
||||
// readAfterWriteWait defines the time to sleep between two retries
|
||||
var readAfterWriteWait = 200 * time.Millisecond
|
||||
|
||||
// largeObjectCreateFile represents an open static or dynamic large object
|
||||
type largeObjectCreateFile struct {
|
||||
conn *Connection
|
||||
container string
|
||||
objectName string
|
||||
currentLength int64
|
||||
filePos int64
|
||||
chunkSize int64
|
||||
segmentContainer string
|
||||
prefix string
|
||||
contentType string
|
||||
checkHash bool
|
||||
segments []Object
|
||||
headers Headers
|
||||
minChunkSize int64
|
||||
}
|
||||
|
||||
func swiftSegmentPath(path string) (string, error) {
|
||||
checksum := sha1.New()
|
||||
random := make([]byte, 32)
|
||||
if _, err := rand.Read(random); err != nil {
|
||||
return "", err
|
||||
}
|
||||
path = hex.EncodeToString(checksum.Sum(append([]byte(path), random...)))
|
||||
return strings.TrimLeft(strings.TrimRight("segments/"+path[0:3]+"/"+path[3:], "/"), "/"), nil
|
||||
}
|
||||
|
||||
func getSegment(segmentPath string, partNumber int) string {
|
||||
return fmt.Sprintf("%s/%016d", segmentPath, partNumber)
|
||||
}
|
||||
|
||||
func parseFullPath(manifest string) (container string, prefix string) {
|
||||
components := strings.SplitN(manifest, "/", 2)
|
||||
container = components[0]
|
||||
if len(components) > 1 {
|
||||
prefix = components[1]
|
||||
}
|
||||
return container, prefix
|
||||
}
|
||||
|
||||
func (headers Headers) IsLargeObjectDLO() bool {
|
||||
_, isDLO := headers["X-Object-Manifest"]
|
||||
return isDLO
|
||||
}
|
||||
|
||||
func (headers Headers) IsLargeObjectSLO() bool {
|
||||
_, isSLO := headers["X-Static-Large-Object"]
|
||||
return isSLO
|
||||
}
|
||||
|
||||
func (headers Headers) IsLargeObject() bool {
|
||||
return headers.IsLargeObjectSLO() || headers.IsLargeObjectDLO()
|
||||
}
|
||||
|
||||
func (c *Connection) getAllSegments(container string, path string, headers Headers) (string, []Object, error) {
|
||||
if manifest, isDLO := headers["X-Object-Manifest"]; isDLO {
|
||||
segmentContainer, segmentPath := parseFullPath(manifest)
|
||||
segments, err := c.getAllDLOSegments(segmentContainer, segmentPath)
|
||||
return segmentContainer, segments, err
|
||||
}
|
||||
if headers.IsLargeObjectSLO() {
|
||||
return c.getAllSLOSegments(container, path)
|
||||
}
|
||||
return "", nil, NotLargeObject
|
||||
}
|
||||
|
||||
// LargeObjectOpts describes how a large object should be created
|
||||
type LargeObjectOpts struct {
|
||||
Container string // Name of container to place object
|
||||
ObjectName string // Name of object
|
||||
Flags int // Creation flags
|
||||
CheckHash bool // If set Check the hash
|
||||
Hash string // If set use this hash to check
|
||||
ContentType string // Content-Type of the object
|
||||
Headers Headers // Additional headers to upload the object with
|
||||
ChunkSize int64 // Size of chunks of the object, defaults to 10MB if not set
|
||||
MinChunkSize int64 // Minimum chunk size, automatically set for SLO's based on info
|
||||
SegmentContainer string // Name of the container to place segments
|
||||
SegmentPrefix string // Prefix to use for the segments
|
||||
NoBuffer bool // Prevents using a bufio.Writer to write segments
|
||||
}
|
||||
|
||||
type LargeObjectFile interface {
|
||||
io.Writer
|
||||
io.Seeker
|
||||
io.Closer
|
||||
Size() int64
|
||||
Flush() error
|
||||
}
|
||||
|
||||
// largeObjectCreate creates a large object at opts.Container, opts.ObjectName.
|
||||
//
|
||||
// opts.Flags can have the following bits set
|
||||
// os.TRUNC - remove the contents of the large object if it exists
|
||||
// os.APPEND - write at the end of the large object
|
||||
func (c *Connection) largeObjectCreate(opts *LargeObjectOpts) (*largeObjectCreateFile, error) {
|
||||
var (
|
||||
segmentPath string
|
||||
segmentContainer string
|
||||
segments []Object
|
||||
currentLength int64
|
||||
err error
|
||||
)
|
||||
|
||||
if opts.SegmentPrefix != "" {
|
||||
segmentPath = opts.SegmentPrefix
|
||||
} else if segmentPath, err = swiftSegmentPath(opts.ObjectName); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if info, headers, err := c.Object(opts.Container, opts.ObjectName); err == nil {
|
||||
if opts.Flags&os.O_TRUNC != 0 {
|
||||
c.LargeObjectDelete(opts.Container, opts.ObjectName)
|
||||
} else {
|
||||
currentLength = info.Bytes
|
||||
if headers.IsLargeObject() {
|
||||
segmentContainer, segments, err = c.getAllSegments(opts.Container, opts.ObjectName, headers)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if len(segments) > 0 {
|
||||
segmentPath = gopath.Dir(segments[0].Name)
|
||||
}
|
||||
} else {
|
||||
if err = c.ObjectMove(opts.Container, opts.ObjectName, opts.Container, getSegment(segmentPath, 1)); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
segments = append(segments, info)
|
||||
}
|
||||
}
|
||||
} else if err != ObjectNotFound {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// segmentContainer is not empty when the manifest already existed
|
||||
if segmentContainer == "" {
|
||||
if opts.SegmentContainer != "" {
|
||||
segmentContainer = opts.SegmentContainer
|
||||
} else {
|
||||
segmentContainer = opts.Container + "_segments"
|
||||
}
|
||||
}
|
||||
|
||||
file := &largeObjectCreateFile{
|
||||
conn: c,
|
||||
checkHash: opts.CheckHash,
|
||||
container: opts.Container,
|
||||
objectName: opts.ObjectName,
|
||||
chunkSize: opts.ChunkSize,
|
||||
minChunkSize: opts.MinChunkSize,
|
||||
headers: opts.Headers,
|
||||
segmentContainer: segmentContainer,
|
||||
prefix: segmentPath,
|
||||
segments: segments,
|
||||
currentLength: currentLength,
|
||||
}
|
||||
|
||||
if file.chunkSize == 0 {
|
||||
file.chunkSize = 10 * 1024 * 1024
|
||||
}
|
||||
|
||||
if file.minChunkSize > file.chunkSize {
|
||||
file.chunkSize = file.minChunkSize
|
||||
}
|
||||
|
||||
if opts.Flags&os.O_APPEND != 0 {
|
||||
file.filePos = currentLength
|
||||
}
|
||||
|
||||
return file, nil
|
||||
}
|
||||
|
||||
// LargeObjectDelete deletes the large object named by container, path
|
||||
func (c *Connection) LargeObjectDelete(container string, objectName string) error {
|
||||
_, headers, err := c.Object(container, objectName)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
var objects [][]string
|
||||
if headers.IsLargeObject() {
|
||||
segmentContainer, segments, err := c.getAllSegments(container, objectName, headers)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for _, obj := range segments {
|
||||
objects = append(objects, []string{segmentContainer, obj.Name})
|
||||
}
|
||||
}
|
||||
objects = append(objects, []string{container, objectName})
|
||||
|
||||
info, err := c.cachedQueryInfo()
|
||||
if err == nil && info.SupportsBulkDelete() && len(objects) > 0 {
|
||||
filenames := make([]string, len(objects))
|
||||
for i, obj := range objects {
|
||||
filenames[i] = obj[0] + "/" + obj[1]
|
||||
}
|
||||
_, err = c.doBulkDelete(filenames)
|
||||
// Don't fail on ObjectNotFound because eventual consistency
|
||||
// makes this situation normal.
|
||||
if err != nil && err != Forbidden && err != ObjectNotFound {
|
||||
return err
|
||||
}
|
||||
} else {
|
||||
for _, obj := range objects {
|
||||
if err := c.ObjectDelete(obj[0], obj[1]); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// LargeObjectGetSegments returns all the segments that compose an object
|
||||
// If the object is a Dynamic Large Object (DLO), it just returns the objects
|
||||
// that have the prefix as indicated by the manifest.
|
||||
// If the object is a Static Large Object (SLO), it retrieves the JSON content
|
||||
// of the manifest and return all the segments of it.
|
||||
func (c *Connection) LargeObjectGetSegments(container string, path string) (string, []Object, error) {
|
||||
_, headers, err := c.Object(container, path)
|
||||
if err != nil {
|
||||
return "", nil, err
|
||||
}
|
||||
|
||||
return c.getAllSegments(container, path, headers)
|
||||
}
|
||||
|
||||
// Seek sets the offset for the next write operation
|
||||
func (file *largeObjectCreateFile) Seek(offset int64, whence int) (int64, error) {
|
||||
switch whence {
|
||||
case 0:
|
||||
file.filePos = offset
|
||||
case 1:
|
||||
file.filePos += offset
|
||||
case 2:
|
||||
file.filePos = file.currentLength + offset
|
||||
default:
|
||||
return -1, fmt.Errorf("invalid value for whence")
|
||||
}
|
||||
if file.filePos < 0 {
|
||||
return -1, fmt.Errorf("negative offset")
|
||||
}
|
||||
return file.filePos, nil
|
||||
}
|
||||
|
||||
func (file *largeObjectCreateFile) Size() int64 {
|
||||
return file.currentLength
|
||||
}
|
||||
|
||||
func withLORetry(expectedSize int64, fn func() (Headers, int64, error)) (err error) {
|
||||
endTimer := time.NewTimer(readAfterWriteTimeout)
|
||||
defer endTimer.Stop()
|
||||
waitingTime := readAfterWriteWait
|
||||
for {
|
||||
var headers Headers
|
||||
var sz int64
|
||||
if headers, sz, err = fn(); err == nil {
|
||||
if !headers.IsLargeObjectDLO() || (expectedSize == 0 && sz > 0) || expectedSize == sz {
|
||||
return
|
||||
}
|
||||
} else {
|
||||
return
|
||||
}
|
||||
waitTimer := time.NewTimer(waitingTime)
|
||||
select {
|
||||
case <-endTimer.C:
|
||||
waitTimer.Stop()
|
||||
err = fmt.Errorf("Timeout expired while waiting for object to have size == %d, got: %d", expectedSize, sz)
|
||||
return
|
||||
case <-waitTimer.C:
|
||||
waitingTime *= 2
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Connection) waitForSegmentsToShowUp(container, objectName string, expectedSize int64) (err error) {
|
||||
err = withLORetry(expectedSize, func() (Headers, int64, error) {
|
||||
var info Object
|
||||
var headers Headers
|
||||
info, headers, err = c.objectBase(container, objectName)
|
||||
if err != nil {
|
||||
return headers, 0, err
|
||||
}
|
||||
return headers, info.Bytes, nil
|
||||
})
|
||||
return
|
||||
}
|
||||
|
||||
// Write satisfies the io.Writer interface
|
||||
func (file *largeObjectCreateFile) Write(buf []byte) (int, error) {
|
||||
var sz int64
|
||||
var relativeFilePos int
|
||||
writeSegmentIdx := 0
|
||||
for i, obj := range file.segments {
|
||||
if file.filePos < sz+obj.Bytes || (i == len(file.segments)-1 && file.filePos < sz+file.minChunkSize) {
|
||||
relativeFilePos = int(file.filePos - sz)
|
||||
break
|
||||
}
|
||||
writeSegmentIdx++
|
||||
sz += obj.Bytes
|
||||
}
|
||||
sizeToWrite := len(buf)
|
||||
for offset := 0; offset < sizeToWrite; {
|
||||
newSegment, n, err := file.writeSegment(buf[offset:], writeSegmentIdx, relativeFilePos)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
if writeSegmentIdx < len(file.segments) {
|
||||
file.segments[writeSegmentIdx] = *newSegment
|
||||
} else {
|
||||
file.segments = append(file.segments, *newSegment)
|
||||
}
|
||||
offset += n
|
||||
writeSegmentIdx++
|
||||
relativeFilePos = 0
|
||||
}
|
||||
file.filePos += int64(sizeToWrite)
|
||||
file.currentLength = 0
|
||||
for _, obj := range file.segments {
|
||||
file.currentLength += obj.Bytes
|
||||
}
|
||||
return sizeToWrite, nil
|
||||
}
|
||||
|
||||
func (file *largeObjectCreateFile) writeSegment(buf []byte, writeSegmentIdx int, relativeFilePos int) (*Object, int, error) {
|
||||
var (
|
||||
readers []io.Reader
|
||||
existingSegment *Object
|
||||
segmentSize int
|
||||
)
|
||||
segmentName := getSegment(file.prefix, writeSegmentIdx+1)
|
||||
sizeToRead := int(file.chunkSize)
|
||||
if writeSegmentIdx < len(file.segments) {
|
||||
existingSegment = &file.segments[writeSegmentIdx]
|
||||
if writeSegmentIdx != len(file.segments)-1 {
|
||||
sizeToRead = int(existingSegment.Bytes)
|
||||
}
|
||||
if relativeFilePos > 0 {
|
||||
headers := make(Headers)
|
||||
headers["Range"] = "bytes=0-" + strconv.FormatInt(int64(relativeFilePos-1), 10)
|
||||
existingSegmentReader, _, err := file.conn.ObjectOpen(file.segmentContainer, segmentName, true, headers)
|
||||
if err != nil {
|
||||
return nil, 0, err
|
||||
}
|
||||
defer existingSegmentReader.Close()
|
||||
sizeToRead -= relativeFilePos
|
||||
segmentSize += relativeFilePos
|
||||
readers = []io.Reader{existingSegmentReader}
|
||||
}
|
||||
}
|
||||
if sizeToRead > len(buf) {
|
||||
sizeToRead = len(buf)
|
||||
}
|
||||
segmentSize += sizeToRead
|
||||
readers = append(readers, bytes.NewReader(buf[:sizeToRead]))
|
||||
if existingSegment != nil && segmentSize < int(existingSegment.Bytes) {
|
||||
headers := make(Headers)
|
||||
headers["Range"] = "bytes=" + strconv.FormatInt(int64(segmentSize), 10) + "-"
|
||||
tailSegmentReader, _, err := file.conn.ObjectOpen(file.segmentContainer, segmentName, true, headers)
|
||||
if err != nil {
|
||||
return nil, 0, err
|
||||
}
|
||||
defer tailSegmentReader.Close()
|
||||
segmentSize = int(existingSegment.Bytes)
|
||||
readers = append(readers, tailSegmentReader)
|
||||
}
|
||||
segmentReader := io.MultiReader(readers...)
|
||||
headers, err := file.conn.ObjectPut(file.segmentContainer, segmentName, segmentReader, true, "", file.contentType, nil)
|
||||
if err != nil {
|
||||
return nil, 0, err
|
||||
}
|
||||
return &Object{Name: segmentName, Bytes: int64(segmentSize), Hash: headers["Etag"]}, sizeToRead, nil
|
||||
}
|
||||
|
||||
func withBuffer(opts *LargeObjectOpts, lo LargeObjectFile) LargeObjectFile {
|
||||
if !opts.NoBuffer {
|
||||
return &bufferedLargeObjectFile{
|
||||
LargeObjectFile: lo,
|
||||
bw: bufio.NewWriterSize(lo, int(opts.ChunkSize)),
|
||||
}
|
||||
}
|
||||
return lo
|
||||
}
|
||||
|
||||
type bufferedLargeObjectFile struct {
|
||||
LargeObjectFile
|
||||
bw *bufio.Writer
|
||||
}
|
||||
|
||||
func (blo *bufferedLargeObjectFile) Close() error {
|
||||
err := blo.bw.Flush()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return blo.LargeObjectFile.Close()
|
||||
}
|
||||
|
||||
func (blo *bufferedLargeObjectFile) Write(p []byte) (n int, err error) {
|
||||
return blo.bw.Write(p)
|
||||
}
|
||||
|
||||
func (blo *bufferedLargeObjectFile) Seek(offset int64, whence int) (int64, error) {
|
||||
err := blo.bw.Flush()
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
return blo.LargeObjectFile.Seek(offset, whence)
|
||||
}
|
||||
|
||||
func (blo *bufferedLargeObjectFile) Size() int64 {
|
||||
return blo.LargeObjectFile.Size() + int64(blo.bw.Buffered())
|
||||
}
|
||||
|
||||
func (blo *bufferedLargeObjectFile) Flush() error {
|
||||
err := blo.bw.Flush()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return blo.LargeObjectFile.Flush()
|
||||
}
|
168
vendor/github.com/ncw/swift/slo.go
generated
vendored
Normal file
168
vendor/github.com/ncw/swift/slo.go
generated
vendored
Normal file
|
@ -0,0 +1,168 @@
|
|||
package swift
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"net/url"
|
||||
"os"
|
||||
)
|
||||
|
||||
// StaticLargeObjectCreateFile represents an open static large object
|
||||
type StaticLargeObjectCreateFile struct {
|
||||
largeObjectCreateFile
|
||||
}
|
||||
|
||||
var SLONotSupported = errors.New("SLO not supported")
|
||||
|
||||
type swiftSegment struct {
|
||||
Path string `json:"path,omitempty"`
|
||||
Etag string `json:"etag,omitempty"`
|
||||
Size int64 `json:"size_bytes,omitempty"`
|
||||
// When uploading a manifest, the attributes must be named `path`, `etag` and `size_bytes`
|
||||
// but when querying the JSON content of a manifest with the `multipart-manifest=get`
|
||||
// parameter, Swift names those attributes `name`, `hash` and `bytes`.
|
||||
// We use all the different attributes names in this structure to be able to use
|
||||
// the same structure for both uploading and retrieving.
|
||||
Name string `json:"name,omitempty"`
|
||||
Hash string `json:"hash,omitempty"`
|
||||
Bytes int64 `json:"bytes,omitempty"`
|
||||
ContentType string `json:"content_type,omitempty"`
|
||||
LastModified string `json:"last_modified,omitempty"`
|
||||
}
|
||||
|
||||
// StaticLargeObjectCreateFile creates a static large object returning
|
||||
// an object which satisfies io.Writer, io.Seeker, io.Closer and
|
||||
// io.ReaderFrom. The flags are as passed to the largeObjectCreate
|
||||
// method.
|
||||
func (c *Connection) StaticLargeObjectCreateFile(opts *LargeObjectOpts) (LargeObjectFile, error) {
|
||||
info, err := c.cachedQueryInfo()
|
||||
if err != nil || !info.SupportsSLO() {
|
||||
return nil, SLONotSupported
|
||||
}
|
||||
realMinChunkSize := info.SLOMinSegmentSize()
|
||||
if realMinChunkSize > opts.MinChunkSize {
|
||||
opts.MinChunkSize = realMinChunkSize
|
||||
}
|
||||
lo, err := c.largeObjectCreate(opts)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return withBuffer(opts, &StaticLargeObjectCreateFile{
|
||||
largeObjectCreateFile: *lo,
|
||||
}), nil
|
||||
}
|
||||
|
||||
// StaticLargeObjectCreate creates or truncates an existing static
|
||||
// large object returning a writeable object. This sets opts.Flags to
|
||||
// an appropriate value before calling StaticLargeObjectCreateFile
|
||||
func (c *Connection) StaticLargeObjectCreate(opts *LargeObjectOpts) (LargeObjectFile, error) {
|
||||
opts.Flags = os.O_TRUNC | os.O_CREATE
|
||||
return c.StaticLargeObjectCreateFile(opts)
|
||||
}
|
||||
|
||||
// StaticLargeObjectDelete deletes a static large object and all of its segments.
|
||||
func (c *Connection) StaticLargeObjectDelete(container string, path string) error {
|
||||
info, err := c.cachedQueryInfo()
|
||||
if err != nil || !info.SupportsSLO() {
|
||||
return SLONotSupported
|
||||
}
|
||||
return c.LargeObjectDelete(container, path)
|
||||
}
|
||||
|
||||
// StaticLargeObjectMove moves a static large object from srcContainer, srcObjectName to dstContainer, dstObjectName
|
||||
func (c *Connection) StaticLargeObjectMove(srcContainer string, srcObjectName string, dstContainer string, dstObjectName string) error {
|
||||
swiftInfo, err := c.cachedQueryInfo()
|
||||
if err != nil || !swiftInfo.SupportsSLO() {
|
||||
return SLONotSupported
|
||||
}
|
||||
info, headers, err := c.Object(srcContainer, srcObjectName)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
container, segments, err := c.getAllSegments(srcContainer, srcObjectName, headers)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := c.createSLOManifest(dstContainer, dstObjectName, info.ContentType, container, segments); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := c.ObjectDelete(srcContainer, srcObjectName); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// createSLOManifest creates a static large object manifest
|
||||
func (c *Connection) createSLOManifest(container string, path string, contentType string, segmentContainer string, segments []Object) error {
|
||||
sloSegments := make([]swiftSegment, len(segments))
|
||||
for i, segment := range segments {
|
||||
sloSegments[i].Path = fmt.Sprintf("%s/%s", segmentContainer, segment.Name)
|
||||
sloSegments[i].Etag = segment.Hash
|
||||
sloSegments[i].Size = segment.Bytes
|
||||
}
|
||||
|
||||
content, err := json.Marshal(sloSegments)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
values := url.Values{}
|
||||
values.Set("multipart-manifest", "put")
|
||||
if _, err := c.objectPut(container, path, bytes.NewBuffer(content), false, "", contentType, nil, values); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (file *StaticLargeObjectCreateFile) Close() error {
|
||||
return file.Flush()
|
||||
}
|
||||
|
||||
func (file *StaticLargeObjectCreateFile) Flush() error {
|
||||
if err := file.conn.createSLOManifest(file.container, file.objectName, file.contentType, file.segmentContainer, file.segments); err != nil {
|
||||
return err
|
||||
}
|
||||
return file.conn.waitForSegmentsToShowUp(file.container, file.objectName, file.Size())
|
||||
}
|
||||
|
||||
func (c *Connection) getAllSLOSegments(container, path string) (string, []Object, error) {
|
||||
var (
|
||||
segmentList []swiftSegment
|
||||
segments []Object
|
||||
segPath string
|
||||
segmentContainer string
|
||||
)
|
||||
|
||||
values := url.Values{}
|
||||
values.Set("multipart-manifest", "get")
|
||||
|
||||
file, _, err := c.objectOpen(container, path, true, nil, values)
|
||||
if err != nil {
|
||||
return "", nil, err
|
||||
}
|
||||
|
||||
content, err := ioutil.ReadAll(file)
|
||||
if err != nil {
|
||||
return "", nil, err
|
||||
}
|
||||
|
||||
json.Unmarshal(content, &segmentList)
|
||||
for _, segment := range segmentList {
|
||||
segmentContainer, segPath = parseFullPath(segment.Name[1:])
|
||||
segments = append(segments, Object{
|
||||
Name: segPath,
|
||||
Bytes: segment.Bytes,
|
||||
Hash: segment.Hash,
|
||||
})
|
||||
}
|
||||
|
||||
return segmentContainer, segments, nil
|
||||
}
|
469
vendor/github.com/ncw/swift/swift.go
generated
vendored
469
vendor/github.com/ncw/swift/swift.go
generated
vendored
|
@ -11,9 +11,11 @@ import (
|
|||
"fmt"
|
||||
"hash"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"mime"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"os"
|
||||
"path"
|
||||
"strconv"
|
||||
"strings"
|
||||
|
@ -33,6 +35,17 @@ const (
|
|||
allObjectsChanLimit = 1000 // ...when fetching to a channel
|
||||
)
|
||||
|
||||
// ObjectType is the type of the swift object, regular, static large,
|
||||
// or dynamic large.
|
||||
type ObjectType int
|
||||
|
||||
// Values that ObjectType can take
|
||||
const (
|
||||
RegularObjectType ObjectType = iota
|
||||
StaticLargeObjectType
|
||||
DynamicLargeObjectType
|
||||
)
|
||||
|
||||
// Connection holds the details of the connection to the swift server.
|
||||
//
|
||||
// You need to provide UserName, ApiKey and AuthUrl when you create a
|
||||
|
@ -86,6 +99,7 @@ type Connection struct {
|
|||
Domain string // User's domain name
|
||||
DomainId string // User's domain Id
|
||||
UserName string // UserName for api
|
||||
UserId string // User Id
|
||||
ApiKey string // Key for api access
|
||||
AuthUrl string // Auth URL
|
||||
Retries int // Retries on error (default is 3)
|
||||
|
@ -108,6 +122,139 @@ type Connection struct {
|
|||
client *http.Client
|
||||
Auth Authenticator `json:"-" xml:"-"` // the current authenticator
|
||||
authLock sync.Mutex // lock when R/W StorageUrl, AuthToken, Auth
|
||||
// swiftInfo is filled after QueryInfo is called
|
||||
swiftInfo SwiftInfo
|
||||
}
|
||||
|
||||
// setFromEnv reads the value that param points to (it must be a
|
||||
// pointer), if it isn't the zero value then it reads the environment
|
||||
// variable name passed in, parses it according to the type and writes
|
||||
// it to the pointer.
|
||||
func setFromEnv(param interface{}, name string) (err error) {
|
||||
val := os.Getenv(name)
|
||||
if val == "" {
|
||||
return
|
||||
}
|
||||
switch result := param.(type) {
|
||||
case *string:
|
||||
if *result == "" {
|
||||
*result = val
|
||||
}
|
||||
case *int:
|
||||
if *result == 0 {
|
||||
*result, err = strconv.Atoi(val)
|
||||
}
|
||||
case *bool:
|
||||
if *result == false {
|
||||
*result, err = strconv.ParseBool(val)
|
||||
}
|
||||
case *time.Duration:
|
||||
if *result == 0 {
|
||||
*result, err = time.ParseDuration(val)
|
||||
}
|
||||
case *EndpointType:
|
||||
if *result == EndpointType("") {
|
||||
*result = EndpointType(val)
|
||||
}
|
||||
default:
|
||||
return newErrorf(0, "can't set var of type %T", param)
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
// ApplyEnvironment reads environment variables and applies them to
|
||||
// the Connection structure. It won't overwrite any parameters which
|
||||
// are already set in the Connection struct.
|
||||
//
|
||||
// To make a new Connection object entirely from the environment you
|
||||
// would do:
|
||||
//
|
||||
// c := new(Connection)
|
||||
// err := c.ApplyEnvironment()
|
||||
// if err != nil { log.Fatal(err) }
|
||||
//
|
||||
// The naming of these variables follows the official Openstack naming
|
||||
// scheme so it should be compatible with OpenStack rc files.
|
||||
//
|
||||
// For v1 authentication (obsolete)
|
||||
// ST_AUTH - Auth URL
|
||||
// ST_USER - UserName for api
|
||||
// ST_KEY - Key for api access
|
||||
//
|
||||
// For v2 authentication
|
||||
// OS_AUTH_URL - Auth URL
|
||||
// OS_USERNAME - UserName for api
|
||||
// OS_PASSWORD - Key for api access
|
||||
// OS_TENANT_NAME - Name of the tenant
|
||||
// OS_TENANT_ID - Id of the tenant
|
||||
// OS_REGION_NAME - Region to use - default is use first region
|
||||
//
|
||||
// For v3 authentication
|
||||
// OS_AUTH_URL - Auth URL
|
||||
// OS_USERNAME - UserName for api
|
||||
// OS_USER_ID - User Id
|
||||
// OS_PASSWORD - Key for api access
|
||||
// OS_USER_DOMAIN_NAME - User's domain name
|
||||
// OS_USER_DOMAIN_ID - User's domain Id
|
||||
// OS_PROJECT_NAME - Name of the project
|
||||
// OS_PROJECT_DOMAIN_NAME - Name of the tenant's domain, only needed if it differs from the user domain
|
||||
// OS_PROJECT_DOMAIN_ID - Id of the tenant's domain, only needed if it differs the from user domain
|
||||
// OS_TRUST_ID - If of the trust
|
||||
// OS_REGION_NAME - Region to use - default is use first region
|
||||
//
|
||||
// Other
|
||||
// OS_ENDPOINT_TYPE - Endpoint type public, internal or admin
|
||||
// ST_AUTH_VERSION - Choose auth version - 1, 2 or 3 or leave at 0 for autodetect
|
||||
//
|
||||
// For manual authentication
|
||||
// OS_STORAGE_URL - storage URL from alternate authentication
|
||||
// OS_AUTH_TOKEN - Auth Token from alternate authentication
|
||||
//
|
||||
// Library specific
|
||||
// GOSWIFT_RETRIES - Retries on error (default is 3)
|
||||
// GOSWIFT_USER_AGENT - HTTP User agent (default goswift/1.0)
|
||||
// GOSWIFT_CONNECT_TIMEOUT - Connect channel timeout with unit, eg "10s", "100ms" (default "10s")
|
||||
// GOSWIFT_TIMEOUT - Data channel timeout with unit, eg "10s", "100ms" (default "60s")
|
||||
// GOSWIFT_INTERNAL - Set this to "true" to use the the internal network (obsolete - use OS_ENDPOINT_TYPE)
|
||||
func (c *Connection) ApplyEnvironment() (err error) {
|
||||
for _, item := range []struct {
|
||||
result interface{}
|
||||
name string
|
||||
}{
|
||||
// Environment variables - keep in same order as Connection
|
||||
{&c.Domain, "OS_USER_DOMAIN_NAME"},
|
||||
{&c.DomainId, "OS_USER_DOMAIN_ID"},
|
||||
{&c.UserName, "OS_USERNAME"},
|
||||
{&c.UserId, "OS_USER_ID"},
|
||||
{&c.ApiKey, "OS_PASSWORD"},
|
||||
{&c.AuthUrl, "OS_AUTH_URL"},
|
||||
{&c.Retries, "GOSWIFT_RETRIES"},
|
||||
{&c.UserAgent, "GOSWIFT_USER_AGENT"},
|
||||
{&c.ConnectTimeout, "GOSWIFT_CONNECT_TIMEOUT"},
|
||||
{&c.Timeout, "GOSWIFT_TIMEOUT"},
|
||||
{&c.Region, "OS_REGION_NAME"},
|
||||
{&c.AuthVersion, "ST_AUTH_VERSION"},
|
||||
{&c.Internal, "GOSWIFT_INTERNAL"},
|
||||
{&c.Tenant, "OS_TENANT_NAME"}, //v2
|
||||
{&c.Tenant, "OS_PROJECT_NAME"}, // v3
|
||||
{&c.TenantId, "OS_TENANT_ID"},
|
||||
{&c.EndpointType, "OS_ENDPOINT_TYPE"},
|
||||
{&c.TenantDomain, "OS_PROJECT_DOMAIN_NAME"},
|
||||
{&c.TenantDomainId, "OS_PROJECT_DOMAIN_ID"},
|
||||
{&c.TrustId, "OS_TRUST_ID"},
|
||||
{&c.StorageUrl, "OS_STORAGE_URL"},
|
||||
{&c.AuthToken, "OS_AUTH_TOKEN"},
|
||||
// v1 auth alternatives
|
||||
{&c.ApiKey, "ST_KEY"},
|
||||
{&c.UserName, "ST_USER"},
|
||||
{&c.AuthUrl, "ST_AUTH"},
|
||||
} {
|
||||
err = setFromEnv(item.result, item.name)
|
||||
if err != nil {
|
||||
return newErrorf(0, "failed to read env var %q: %v", item.name, err)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Error - all errors generated by this package are of this type. Other error
|
||||
|
@ -140,6 +287,7 @@ type errorMap map[int]error
|
|||
|
||||
var (
|
||||
// Specific Errors you might want to check for equality
|
||||
NotModified = newError(304, "Not Modified")
|
||||
BadRequest = newError(400, "Bad Request")
|
||||
AuthorizationFailed = newError(401, "Authorization Failed")
|
||||
ContainerNotFound = newError(404, "Container Not Found")
|
||||
|
@ -167,6 +315,7 @@ var (
|
|||
|
||||
// Mappings for object errors
|
||||
objectErrorMap = errorMap{
|
||||
304: NotModified,
|
||||
400: BadRequest,
|
||||
403: Forbidden,
|
||||
404: ObjectNotFound,
|
||||
|
@ -184,15 +333,32 @@ func checkClose(c io.Closer, err *error) {
|
|||
}
|
||||
}
|
||||
|
||||
// drainAndClose discards all data from rd and closes it.
|
||||
// If an error occurs during Read, it is discarded.
|
||||
func drainAndClose(rd io.ReadCloser, err *error) {
|
||||
if rd == nil {
|
||||
return
|
||||
}
|
||||
|
||||
_, _ = io.Copy(ioutil.Discard, rd)
|
||||
cerr := rd.Close()
|
||||
if err != nil && *err == nil {
|
||||
*err = cerr
|
||||
}
|
||||
}
|
||||
|
||||
// parseHeaders checks a response for errors and translates into
|
||||
// standard errors if necessary.
|
||||
// standard errors if necessary. If an error is returned, resp.Body
|
||||
// has been drained and closed.
|
||||
func (c *Connection) parseHeaders(resp *http.Response, errorMap errorMap) error {
|
||||
if errorMap != nil {
|
||||
if err, ok := errorMap[resp.StatusCode]; ok {
|
||||
drainAndClose(resp.Body, nil)
|
||||
return err
|
||||
}
|
||||
}
|
||||
if resp.StatusCode < 200 || resp.StatusCode > 299 {
|
||||
drainAndClose(resp.Body, nil)
|
||||
return newErrorf(resp.StatusCode, "HTTP Error: %d: %s", resp.StatusCode, resp.Status)
|
||||
}
|
||||
return nil
|
||||
|
@ -305,13 +471,14 @@ again:
|
|||
}
|
||||
if req != nil {
|
||||
timer := time.NewTimer(c.ConnectTimeout)
|
||||
defer timer.Stop()
|
||||
var resp *http.Response
|
||||
resp, err = c.doTimeoutRequest(timer, req)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
defer func() {
|
||||
checkClose(resp.Body, &err)
|
||||
drainAndClose(resp.Body, &err)
|
||||
// Flush the auth connection - we don't want to keep
|
||||
// it open if keepalives were enabled
|
||||
flushKeepaliveConnections(c.Transport)
|
||||
|
@ -406,6 +573,24 @@ func (c *Connection) authenticated() bool {
|
|||
// the enabled middlewares and their configuration
|
||||
type SwiftInfo map[string]interface{}
|
||||
|
||||
func (i SwiftInfo) SupportsBulkDelete() bool {
|
||||
_, val := i["bulk_delete"]
|
||||
return val
|
||||
}
|
||||
|
||||
func (i SwiftInfo) SupportsSLO() bool {
|
||||
_, val := i["slo"]
|
||||
return val
|
||||
}
|
||||
|
||||
func (i SwiftInfo) SLOMinSegmentSize() int64 {
|
||||
if slo, ok := i["slo"].(map[string]interface{}); ok {
|
||||
val, _ := slo["min_segment_size"].(float64)
|
||||
return int64(val)
|
||||
}
|
||||
return 1
|
||||
}
|
||||
|
||||
// Discover Swift configuration by doing a request against /info
|
||||
func (c *Connection) QueryInfo() (infos SwiftInfo, err error) {
|
||||
infoUrl, err := url.Parse(c.StorageUrl)
|
||||
|
@ -413,14 +598,36 @@ func (c *Connection) QueryInfo() (infos SwiftInfo, err error) {
|
|||
return nil, err
|
||||
}
|
||||
infoUrl.Path = path.Join(infoUrl.Path, "..", "..", "info")
|
||||
resp, err := http.Get(infoUrl.String())
|
||||
resp, err := c.client.Get(infoUrl.String())
|
||||
if err == nil {
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
drainAndClose(resp.Body, nil)
|
||||
return nil, fmt.Errorf("Invalid status code for info request: %d", resp.StatusCode)
|
||||
}
|
||||
err = readJson(resp, &infos)
|
||||
if err == nil {
|
||||
c.authLock.Lock()
|
||||
c.swiftInfo = infos
|
||||
c.authLock.Unlock()
|
||||
}
|
||||
return infos, err
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
|
||||
func (c *Connection) cachedQueryInfo() (infos SwiftInfo, err error) {
|
||||
c.authLock.Lock()
|
||||
infos = c.swiftInfo
|
||||
c.authLock.Unlock()
|
||||
if infos == nil {
|
||||
infos, err = c.QueryInfo()
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
return infos, nil
|
||||
}
|
||||
|
||||
// RequestOpts contains parameters for Connection.storage.
|
||||
type RequestOpts struct {
|
||||
Container string
|
||||
|
@ -444,6 +651,7 @@ type RequestOpts struct {
|
|||
// Any other parameters (if not None) are added to the targetUrl
|
||||
//
|
||||
// Returns a response or an error. If response is returned then
|
||||
// the resp.Body must be read completely and
|
||||
// resp.Body.Close() must be called on it, unless noResponse is set in
|
||||
// which case the body will be closed in this function
|
||||
//
|
||||
|
@ -484,6 +692,7 @@ func (c *Connection) Call(targetUrl string, p RequestOpts) (resp *http.Response,
|
|||
URL.RawQuery = p.Parameters.Encode()
|
||||
}
|
||||
timer := time.NewTimer(c.ConnectTimeout)
|
||||
defer timer.Stop()
|
||||
reader := p.Body
|
||||
if reader != nil {
|
||||
reader = newWatchdogReader(reader, c.Timeout, timer)
|
||||
|
@ -518,7 +727,7 @@ func (c *Connection) Call(targetUrl string, p RequestOpts) (resp *http.Response,
|
|||
}
|
||||
// Check to see if token has expired
|
||||
if resp.StatusCode == 401 && retries > 0 {
|
||||
_ = resp.Body.Close()
|
||||
drainAndClose(resp.Body, nil)
|
||||
c.UnAuthenticate()
|
||||
retries--
|
||||
} else {
|
||||
|
@ -527,12 +736,12 @@ func (c *Connection) Call(targetUrl string, p RequestOpts) (resp *http.Response,
|
|||
}
|
||||
|
||||
if err = c.parseHeaders(resp, p.ErrorMap); err != nil {
|
||||
_ = resp.Body.Close()
|
||||
return nil, nil, err
|
||||
}
|
||||
headers = readHeaders(resp)
|
||||
if p.NoResponse {
|
||||
err = resp.Body.Close()
|
||||
var err error
|
||||
drainAndClose(resp.Body, &err)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
@ -574,7 +783,7 @@ func (c *Connection) storage(p RequestOpts) (resp *http.Response, headers Header
|
|||
//
|
||||
// Closes the response when done
|
||||
func readLines(resp *http.Response) (lines []string, err error) {
|
||||
defer checkClose(resp.Body, &err)
|
||||
defer drainAndClose(resp.Body, &err)
|
||||
reader := bufio.NewReader(resp.Body)
|
||||
buffer := bytes.NewBuffer(make([]byte, 0, 128))
|
||||
var part []byte
|
||||
|
@ -599,7 +808,7 @@ func readLines(resp *http.Response) (lines []string, err error) {
|
|||
//
|
||||
// Closes the response when done
|
||||
func readJson(resp *http.Response, result interface{}) (err error) {
|
||||
defer checkClose(resp.Body, &err)
|
||||
defer drainAndClose(resp.Body, &err)
|
||||
decoder := json.NewDecoder(resp.Body)
|
||||
return decoder.Decode(result)
|
||||
}
|
||||
|
@ -796,14 +1005,15 @@ func (c *Connection) ObjectNames(container string, opts *ObjectsOpts) ([]string,
|
|||
|
||||
// Object contains information about an object
|
||||
type Object struct {
|
||||
Name string `json:"name"` // object name
|
||||
ContentType string `json:"content_type"` // eg application/directory
|
||||
Bytes int64 `json:"bytes"` // size in bytes
|
||||
ServerLastModified string `json:"last_modified"` // Last modified time, eg '2011-06-30T08:20:47.736680' as a string supplied by the server
|
||||
LastModified time.Time // Last modified time converted to a time.Time
|
||||
Hash string `json:"hash"` // MD5 hash, eg "d41d8cd98f00b204e9800998ecf8427e"
|
||||
PseudoDirectory bool // Set when using delimiter to show that this directory object does not really exist
|
||||
SubDir string `json:"subdir"` // returned only when using delimiter to mark "pseudo directories"
|
||||
Name string `json:"name"` // object name
|
||||
ContentType string `json:"content_type"` // eg application/directory
|
||||
Bytes int64 `json:"bytes"` // size in bytes
|
||||
ServerLastModified string `json:"last_modified"` // Last modified time, eg '2011-06-30T08:20:47.736680' as a string supplied by the server
|
||||
LastModified time.Time // Last modified time converted to a time.Time
|
||||
Hash string `json:"hash"` // MD5 hash, eg "d41d8cd98f00b204e9800998ecf8427e"
|
||||
PseudoDirectory bool // Set when using delimiter to show that this directory object does not really exist
|
||||
SubDir string `json:"subdir"` // returned only when using delimiter to mark "pseudo directories"
|
||||
ObjectType ObjectType // type of this object
|
||||
}
|
||||
|
||||
// Objects returns a slice of Object with information about each
|
||||
|
@ -1141,6 +1351,19 @@ func (file *ObjectCreateFile) Close() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// Headers returns the response headers from the created object if the upload
|
||||
// has been completed. The Close() method must be called on an ObjectCreateFile
|
||||
// before this method.
|
||||
func (file *ObjectCreateFile) Headers() (Headers, error) {
|
||||
// error out if upload is not complete.
|
||||
select {
|
||||
case <-file.done:
|
||||
default:
|
||||
return nil, fmt.Errorf("Cannot get metadata, object upload failed or has not yet completed.")
|
||||
}
|
||||
return file.headers, nil
|
||||
}
|
||||
|
||||
// Check it satisfies the interface
|
||||
var _ io.WriteCloser = &ObjectCreateFile{}
|
||||
|
||||
|
@ -1202,7 +1425,7 @@ func (c *Connection) ObjectCreate(container string, objectName string, checkHash
|
|||
}
|
||||
// Run the PUT in the background piping it data
|
||||
go func() {
|
||||
file.resp, file.headers, file.err = c.storage(RequestOpts{
|
||||
opts := RequestOpts{
|
||||
Container: container,
|
||||
ObjectName: objectName,
|
||||
Operation: "PUT",
|
||||
|
@ -1210,7 +1433,8 @@ func (c *Connection) ObjectCreate(container string, objectName string, checkHash
|
|||
Body: pipeReader,
|
||||
NoResponse: true,
|
||||
ErrorMap: objectErrorMap,
|
||||
})
|
||||
}
|
||||
file.resp, file.headers, file.err = c.storage(opts)
|
||||
// Signal finished
|
||||
pipeReader.Close()
|
||||
close(file.done)
|
||||
|
@ -1218,6 +1442,37 @@ func (c *Connection) ObjectCreate(container string, objectName string, checkHash
|
|||
return
|
||||
}
|
||||
|
||||
func (c *Connection) objectPut(container string, objectName string, contents io.Reader, checkHash bool, Hash string, contentType string, h Headers, parameters url.Values) (headers Headers, err error) {
|
||||
extraHeaders := objectPutHeaders(objectName, &checkHash, Hash, contentType, h)
|
||||
hash := md5.New()
|
||||
var body io.Reader = contents
|
||||
if checkHash {
|
||||
body = io.TeeReader(contents, hash)
|
||||
}
|
||||
_, headers, err = c.storage(RequestOpts{
|
||||
Container: container,
|
||||
ObjectName: objectName,
|
||||
Operation: "PUT",
|
||||
Headers: extraHeaders,
|
||||
Body: body,
|
||||
NoResponse: true,
|
||||
ErrorMap: objectErrorMap,
|
||||
Parameters: parameters,
|
||||
})
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
if checkHash {
|
||||
receivedMd5 := strings.ToLower(headers["Etag"])
|
||||
calculatedMd5 := fmt.Sprintf("%x", hash.Sum(nil))
|
||||
if receivedMd5 != calculatedMd5 {
|
||||
err = ObjectCorrupted
|
||||
return
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// ObjectPut creates or updates the path in the container from
|
||||
// contents. contents should be an open io.Reader which will have all
|
||||
// its contents read.
|
||||
|
@ -1240,33 +1495,7 @@ func (c *Connection) ObjectCreate(container string, objectName string, checkHash
|
|||
// If contentType is set it will be used, otherwise one will be
|
||||
// guessed from objectName using mime.TypeByExtension
|
||||
func (c *Connection) ObjectPut(container string, objectName string, contents io.Reader, checkHash bool, Hash string, contentType string, h Headers) (headers Headers, err error) {
|
||||
extraHeaders := objectPutHeaders(objectName, &checkHash, Hash, contentType, h)
|
||||
hash := md5.New()
|
||||
var body io.Reader = contents
|
||||
if checkHash {
|
||||
body = io.TeeReader(contents, hash)
|
||||
}
|
||||
_, headers, err = c.storage(RequestOpts{
|
||||
Container: container,
|
||||
ObjectName: objectName,
|
||||
Operation: "PUT",
|
||||
Headers: extraHeaders,
|
||||
Body: body,
|
||||
NoResponse: true,
|
||||
ErrorMap: objectErrorMap,
|
||||
})
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
if checkHash {
|
||||
receivedMd5 := strings.ToLower(headers["Etag"])
|
||||
calculatedMd5 := fmt.Sprintf("%x", hash.Sum(nil))
|
||||
if receivedMd5 != calculatedMd5 {
|
||||
err = ObjectCorrupted
|
||||
return
|
||||
}
|
||||
}
|
||||
return
|
||||
return c.objectPut(container, objectName, contents, checkHash, Hash, contentType, h, nil)
|
||||
}
|
||||
|
||||
// ObjectPutBytes creates an object from a []byte in a container.
|
||||
|
@ -1274,7 +1503,8 @@ func (c *Connection) ObjectPut(container string, objectName string, contents io.
|
|||
// This is a simplified interface which checks the MD5.
|
||||
func (c *Connection) ObjectPutBytes(container string, objectName string, contents []byte, contentType string) (err error) {
|
||||
buf := bytes.NewBuffer(contents)
|
||||
_, err = c.ObjectPut(container, objectName, buf, true, "", contentType, nil)
|
||||
h := Headers{"Content-Length": strconv.Itoa(len(contents))}
|
||||
_, err = c.ObjectPut(container, objectName, buf, true, "", contentType, h)
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -1283,7 +1513,8 @@ func (c *Connection) ObjectPutBytes(container string, objectName string, content
|
|||
// This is a simplified interface which checks the MD5
|
||||
func (c *Connection) ObjectPutString(container string, objectName string, contents string, contentType string) (err error) {
|
||||
buf := strings.NewReader(contents)
|
||||
_, err = c.ObjectPut(container, objectName, buf, true, "", contentType, nil)
|
||||
h := Headers{"Content-Length": strconv.Itoa(len(contents))}
|
||||
_, err = c.ObjectPut(container, objectName, buf, true, "", contentType, h)
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -1303,10 +1534,14 @@ type ObjectOpenFile struct {
|
|||
lengthOk bool // whether length is valid
|
||||
length int64 // length of the object if read
|
||||
seeked bool // whether we have seeked this file or not
|
||||
overSeeked bool // set if we have seeked to the end or beyond
|
||||
}
|
||||
|
||||
// Read bytes from the object - see io.Reader
|
||||
func (file *ObjectOpenFile) Read(p []byte) (n int, err error) {
|
||||
if file.overSeeked {
|
||||
return 0, io.EOF
|
||||
}
|
||||
n, err = file.body.Read(p)
|
||||
file.bytes += int64(n)
|
||||
file.pos += int64(n)
|
||||
|
@ -1330,6 +1565,7 @@ func (file *ObjectOpenFile) Read(p []byte) (n int, err error) {
|
|||
//
|
||||
// Seek(0, 1) will return the current file pointer.
|
||||
func (file *ObjectOpenFile) Seek(offset int64, whence int) (newPos int64, err error) {
|
||||
file.overSeeked = false
|
||||
switch whence {
|
||||
case 0: // relative to start
|
||||
newPos = offset
|
||||
|
@ -1340,6 +1576,10 @@ func (file *ObjectOpenFile) Seek(offset int64, whence int) (newPos int64, err er
|
|||
return file.pos, newError(0, "Length of file unknown so can't seek from end")
|
||||
}
|
||||
newPos = file.length + offset
|
||||
if offset >= 0 {
|
||||
file.overSeeked = true
|
||||
return
|
||||
}
|
||||
default:
|
||||
panic("Unknown whence in ObjectOpenFile.Seek")
|
||||
}
|
||||
|
@ -1419,6 +1659,57 @@ func (file *ObjectOpenFile) Close() (err error) {
|
|||
var _ io.ReadCloser = &ObjectOpenFile{}
|
||||
var _ io.Seeker = &ObjectOpenFile{}
|
||||
|
||||
func (c *Connection) objectOpenBase(container string, objectName string, checkHash bool, h Headers, parameters url.Values) (file *ObjectOpenFile, headers Headers, err error) {
|
||||
var resp *http.Response
|
||||
opts := RequestOpts{
|
||||
Container: container,
|
||||
ObjectName: objectName,
|
||||
Operation: "GET",
|
||||
ErrorMap: objectErrorMap,
|
||||
Headers: h,
|
||||
Parameters: parameters,
|
||||
}
|
||||
resp, headers, err = c.storage(opts)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
// Can't check MD5 on an object with X-Object-Manifest or X-Static-Large-Object set
|
||||
if checkHash && headers.IsLargeObject() {
|
||||
// log.Printf("swift: turning off md5 checking on object with manifest %v", objectName)
|
||||
checkHash = false
|
||||
}
|
||||
file = &ObjectOpenFile{
|
||||
connection: c,
|
||||
container: container,
|
||||
objectName: objectName,
|
||||
headers: h,
|
||||
resp: resp,
|
||||
checkHash: checkHash,
|
||||
body: resp.Body,
|
||||
}
|
||||
if checkHash {
|
||||
file.hash = md5.New()
|
||||
file.body = io.TeeReader(resp.Body, file.hash)
|
||||
}
|
||||
// Read Content-Length
|
||||
if resp.Header.Get("Content-Length") != "" {
|
||||
file.length, err = getInt64FromHeader(resp, "Content-Length")
|
||||
file.lengthOk = (err == nil)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (c *Connection) objectOpen(container string, objectName string, checkHash bool, h Headers, parameters url.Values) (file *ObjectOpenFile, headers Headers, err error) {
|
||||
err = withLORetry(0, func() (Headers, int64, error) {
|
||||
file, headers, err = c.objectOpenBase(container, objectName, checkHash, h, parameters)
|
||||
if err != nil {
|
||||
return headers, 0, err
|
||||
}
|
||||
return headers, file.length, nil
|
||||
})
|
||||
return
|
||||
}
|
||||
|
||||
// ObjectOpen returns an ObjectOpenFile for reading the contents of
|
||||
// the object. This satisfies the io.ReadCloser and the io.Seeker
|
||||
// interfaces.
|
||||
|
@ -1443,41 +1734,7 @@ var _ io.Seeker = &ObjectOpenFile{}
|
|||
//
|
||||
// headers["Content-Type"] will give the content type if desired.
|
||||
func (c *Connection) ObjectOpen(container string, objectName string, checkHash bool, h Headers) (file *ObjectOpenFile, headers Headers, err error) {
|
||||
var resp *http.Response
|
||||
resp, headers, err = c.storage(RequestOpts{
|
||||
Container: container,
|
||||
ObjectName: objectName,
|
||||
Operation: "GET",
|
||||
ErrorMap: objectErrorMap,
|
||||
Headers: h,
|
||||
})
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
// Can't check MD5 on an object with X-Object-Manifest or X-Static-Large-Object set
|
||||
if checkHash && (headers["X-Object-Manifest"] != "" || headers["X-Static-Large-Object"] != "") {
|
||||
// log.Printf("swift: turning off md5 checking on object with manifest %v", objectName)
|
||||
checkHash = false
|
||||
}
|
||||
file = &ObjectOpenFile{
|
||||
connection: c,
|
||||
container: container,
|
||||
objectName: objectName,
|
||||
headers: h,
|
||||
resp: resp,
|
||||
checkHash: checkHash,
|
||||
body: resp.Body,
|
||||
}
|
||||
if checkHash {
|
||||
file.hash = md5.New()
|
||||
file.body = io.TeeReader(resp.Body, file.hash)
|
||||
}
|
||||
// Read Content-Length
|
||||
if resp.Header.Get("Content-Length") != "" {
|
||||
file.length, err = getInt64FromHeader(resp, "Content-Length")
|
||||
file.lengthOk = (err == nil)
|
||||
}
|
||||
return
|
||||
return c.objectOpen(container, objectName, checkHash, h, nil)
|
||||
}
|
||||
|
||||
// ObjectGet gets the object into the io.Writer contents.
|
||||
|
@ -1580,19 +1837,10 @@ type BulkDeleteResult struct {
|
|||
Headers Headers // Response HTTP headers.
|
||||
}
|
||||
|
||||
// BulkDelete deletes multiple objectNames from container in one operation.
|
||||
//
|
||||
// Some servers may not accept bulk-delete requests since bulk-delete is
|
||||
// an optional feature of swift - these will return the Forbidden error.
|
||||
//
|
||||
// See also:
|
||||
// * http://docs.openstack.org/trunk/openstack-object-storage/admin/content/object-storage-bulk-delete.html
|
||||
// * http://docs.rackspace.com/files/api/v1/cf-devguide/content/Bulk_Delete-d1e2338.html
|
||||
func (c *Connection) BulkDelete(container string, objectNames []string) (result BulkDeleteResult, err error) {
|
||||
func (c *Connection) doBulkDelete(objects []string) (result BulkDeleteResult, err error) {
|
||||
var buffer bytes.Buffer
|
||||
for _, s := range objectNames {
|
||||
buffer.WriteString(fmt.Sprintf("/%s/%s\n", container,
|
||||
url.QueryEscape(s)))
|
||||
for _, s := range objects {
|
||||
buffer.WriteString(url.QueryEscape(s) + "\n")
|
||||
}
|
||||
resp, headers, err := c.storage(RequestOpts{
|
||||
Operation: "DELETE",
|
||||
|
@ -1633,6 +1881,22 @@ func (c *Connection) BulkDelete(container string, objectNames []string) (result
|
|||
return
|
||||
}
|
||||
|
||||
// BulkDelete deletes multiple objectNames from container in one operation.
|
||||
//
|
||||
// Some servers may not accept bulk-delete requests since bulk-delete is
|
||||
// an optional feature of swift - these will return the Forbidden error.
|
||||
//
|
||||
// See also:
|
||||
// * http://docs.openstack.org/trunk/openstack-object-storage/admin/content/object-storage-bulk-delete.html
|
||||
// * http://docs.rackspace.com/files/api/v1/cf-devguide/content/Bulk_Delete-d1e2338.html
|
||||
func (c *Connection) BulkDelete(container string, objectNames []string) (result BulkDeleteResult, err error) {
|
||||
fullPaths := make([]string, len(objectNames))
|
||||
for i, name := range objectNames {
|
||||
fullPaths[i] = fmt.Sprintf("/%s/%s", container, name)
|
||||
}
|
||||
return c.doBulkDelete(fullPaths)
|
||||
}
|
||||
|
||||
// BulkUploadResult stores results of BulkUpload().
|
||||
//
|
||||
// Individual errors may (or may not) be returned by Errors.
|
||||
|
@ -1716,6 +1980,17 @@ func (c *Connection) BulkUpload(uploadPath string, dataStream io.Reader, format
|
|||
//
|
||||
// Use headers.ObjectMetadata() to read the metadata in the Headers.
|
||||
func (c *Connection) Object(container string, objectName string) (info Object, headers Headers, err error) {
|
||||
err = withLORetry(0, func() (Headers, int64, error) {
|
||||
info, headers, err = c.objectBase(container, objectName)
|
||||
if err != nil {
|
||||
return headers, 0, err
|
||||
}
|
||||
return headers, info.Bytes, nil
|
||||
})
|
||||
return
|
||||
}
|
||||
|
||||
func (c *Connection) objectBase(container string, objectName string) (info Object, headers Headers, err error) {
|
||||
var resp *http.Response
|
||||
resp, headers, err = c.storage(RequestOpts{
|
||||
Container: container,
|
||||
|
@ -1756,6 +2031,12 @@ func (c *Connection) Object(container string, objectName string) (info Object, h
|
|||
}
|
||||
|
||||
info.Hash = resp.Header.Get("Etag")
|
||||
if resp.Header.Get("X-Object-Manifest") != "" {
|
||||
info.ObjectType = DynamicLargeObjectType
|
||||
} else if resp.Header.Get("X-Static-Large-Object") != "" {
|
||||
info.ObjectType = StaticLargeObjectType
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
|
|
274
vendor/github.com/ncw/swift/swifttest/server.go
generated
vendored
274
vendor/github.com/ncw/swift/swifttest/server.go
generated
vendored
|
@ -21,6 +21,7 @@ import (
|
|||
"mime"
|
||||
"net"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"net/url"
|
||||
"path"
|
||||
"regexp"
|
||||
|
@ -28,6 +29,7 @@ import (
|
|||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
|
@ -39,21 +41,28 @@ const (
|
|||
TEST_ACCOUNT = "swifttest"
|
||||
)
|
||||
|
||||
type HandlerOverrideFunc func(w http.ResponseWriter, r *http.Request, recorder *httptest.ResponseRecorder)
|
||||
|
||||
type SwiftServer struct {
|
||||
// `sync/atomic` expects the first word in an allocated struct to be 64-bit
|
||||
// aligned on both ARM and x86-32.
|
||||
// See https://golang.org/pkg/sync/atomic/#pkg-note-BUG for more details.
|
||||
reqId int64
|
||||
sync.RWMutex
|
||||
t *testing.T
|
||||
reqId int
|
||||
mu sync.Mutex
|
||||
Listener net.Listener
|
||||
AuthURL string
|
||||
URL string
|
||||
Accounts map[string]*account
|
||||
Sessions map[string]*session
|
||||
override map[string]HandlerOverrideFunc
|
||||
}
|
||||
|
||||
// The Folder type represents a container stored in an account
|
||||
type Folder struct {
|
||||
Count int `json:"count"`
|
||||
Bytes int `json:"bytes"`
|
||||
Count int64 `json:"count"`
|
||||
Bytes int64 `json:"bytes"`
|
||||
Name string `json:"name"`
|
||||
}
|
||||
|
||||
|
@ -96,13 +105,16 @@ type metadata struct {
|
|||
}
|
||||
|
||||
type account struct {
|
||||
sync.RWMutex
|
||||
swift.Account
|
||||
metadata
|
||||
password string
|
||||
Containers map[string]*container
|
||||
password string
|
||||
ContainersLock sync.RWMutex
|
||||
Containers map[string]*container
|
||||
}
|
||||
|
||||
type object struct {
|
||||
sync.RWMutex
|
||||
metadata
|
||||
name string
|
||||
mtime time.Time
|
||||
|
@ -112,11 +124,31 @@ type object struct {
|
|||
}
|
||||
|
||||
type container struct {
|
||||
// `sync/atomic` expects the first word in an allocated struct to be 64-bit
|
||||
// aligned on both ARM and x86-32.
|
||||
// See https://golang.org/pkg/sync/atomic/#pkg-note-BUG for more details.
|
||||
bytes int64
|
||||
sync.RWMutex
|
||||
metadata
|
||||
name string
|
||||
ctime time.Time
|
||||
objects map[string]*object
|
||||
bytes int
|
||||
}
|
||||
|
||||
type segment struct {
|
||||
Path string `json:"path,omitempty"`
|
||||
Hash string `json:"hash,omitempty"`
|
||||
Size int64 `json:"size_bytes,omitempty"`
|
||||
// When uploading a manifest, the attributes must be named `path`, `hash` and `size`
|
||||
// but when querying the JSON content of a manifest with the `multipart-manifest=get`
|
||||
// parameter, Swift names those attributes `name`, `etag` and `bytes`.
|
||||
// We use all the different attributes names in this structure to be able to use
|
||||
// the same structure for both uploading and retrieving.
|
||||
Name string `json:"name,omitempty"`
|
||||
Etag string `json:"etag,omitempty"`
|
||||
Bytes int64 `json:"bytes,omitempty"`
|
||||
ContentType string `json:"content_type,omitempty"`
|
||||
LastModified string `json:"last_modified,omitempty"`
|
||||
}
|
||||
|
||||
// A resource encapsulates the subject of an HTTP request.
|
||||
|
@ -179,9 +211,12 @@ func (m metadata) getMetadata(a *action) {
|
|||
}
|
||||
}
|
||||
|
||||
func (c container) list(delimiter string, marker string, prefix string, parent string) (resp []interface{}) {
|
||||
func (c *container) list(delimiter string, marker string, prefix string, parent string) (resp []interface{}) {
|
||||
var tmp orderedObjects
|
||||
|
||||
c.RLock()
|
||||
defer c.RUnlock()
|
||||
|
||||
// first get all matching objects and arrange them in alphabetical order.
|
||||
for _, obj := range c.objects {
|
||||
if strings.HasPrefix(obj.name, prefix) {
|
||||
|
@ -236,19 +271,23 @@ func (r containerResource) get(a *action) interface{} {
|
|||
fatalf(404, "NoSuchContainer", "The specified container does not exist")
|
||||
}
|
||||
|
||||
r.container.RLock()
|
||||
|
||||
delimiter := a.req.Form.Get("delimiter")
|
||||
marker := a.req.Form.Get("marker")
|
||||
prefix := a.req.Form.Get("prefix")
|
||||
format := a.req.URL.Query().Get("format")
|
||||
parent := a.req.Form.Get("path")
|
||||
|
||||
a.w.Header().Set("X-Container-Bytes-Used", strconv.Itoa(r.container.bytes))
|
||||
a.w.Header().Set("X-Container-Bytes-Used", strconv.Itoa(int(r.container.bytes)))
|
||||
a.w.Header().Set("X-Container-Object-Count", strconv.Itoa(len(r.container.objects)))
|
||||
r.container.getMetadata(a)
|
||||
|
||||
if a.req.Method == "HEAD" {
|
||||
r.container.RUnlock()
|
||||
return nil
|
||||
}
|
||||
r.container.RUnlock()
|
||||
|
||||
objects := r.container.list(delimiter, marker, prefix, parent)
|
||||
|
||||
|
@ -297,8 +336,10 @@ func (r containerResource) delete(a *action) interface{} {
|
|||
if len(b.objects) > 0 {
|
||||
fatalf(409, "Conflict", "The container you tried to delete is not empty")
|
||||
}
|
||||
a.user.Lock()
|
||||
delete(a.user.Containers, b.name)
|
||||
a.user.Account.Containers--
|
||||
a.user.Unlock()
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -319,8 +360,11 @@ func (r containerResource) put(a *action) interface{} {
|
|||
},
|
||||
}
|
||||
r.container.setMetadata(a, "container")
|
||||
|
||||
a.user.Lock()
|
||||
a.user.Containers[r.name] = r.container
|
||||
a.user.Account.Containers++
|
||||
a.user.Unlock()
|
||||
}
|
||||
|
||||
return nil
|
||||
|
@ -330,10 +374,13 @@ func (r containerResource) post(a *action) interface{} {
|
|||
if r.container == nil {
|
||||
fatalf(400, "Method", "The resource could not be found.")
|
||||
} else {
|
||||
r.container.RLock()
|
||||
defer r.container.RUnlock()
|
||||
|
||||
r.container.setMetadata(a, "container")
|
||||
a.w.WriteHeader(201)
|
||||
jsonMarshal(a.w, Folder{
|
||||
Count: len(r.container.objects),
|
||||
Count: int64(len(r.container.objects)),
|
||||
Bytes: r.container.bytes,
|
||||
Name: r.container.name,
|
||||
})
|
||||
|
@ -388,10 +435,11 @@ func (obj *object) Key() Key {
|
|||
}
|
||||
|
||||
var metaHeaders = map[string]bool{
|
||||
"Content-Type": true,
|
||||
"Content-Encoding": true,
|
||||
"Content-Disposition": true,
|
||||
"X-Object-Manifest": true,
|
||||
"Content-Type": true,
|
||||
"Content-Encoding": true,
|
||||
"Content-Disposition": true,
|
||||
"X-Object-Manifest": true,
|
||||
"X-Static-Large-Object": true,
|
||||
}
|
||||
|
||||
var rangeRegexp = regexp.MustCompile("(bytes=)?([0-9]*)-([0-9]*)")
|
||||
|
@ -409,6 +457,9 @@ func (objr objectResource) get(a *action) interface{} {
|
|||
fatalf(404, "Not Found", "The resource could not be found.")
|
||||
}
|
||||
|
||||
obj.RLock()
|
||||
defer obj.RUnlock()
|
||||
|
||||
h := a.w.Header()
|
||||
// add metadata
|
||||
obj.getMetadata(a)
|
||||
|
@ -433,7 +484,9 @@ func (objr objectResource) get(a *action) interface{} {
|
|||
if manifest, ok := obj.meta["X-Object-Manifest"]; ok {
|
||||
var segments []io.Reader
|
||||
components := strings.SplitN(manifest[0], "/", 2)
|
||||
a.user.RLock()
|
||||
segContainer := a.user.Containers[components[0]]
|
||||
a.user.RUnlock()
|
||||
prefix := components[1]
|
||||
resp := segContainer.list("", "", prefix, "")
|
||||
sum := md5.New()
|
||||
|
@ -453,19 +506,54 @@ func (objr objectResource) get(a *action) interface{} {
|
|||
}
|
||||
etag = sum.Sum(nil)
|
||||
if end == -1 {
|
||||
end = size
|
||||
end = size - 1
|
||||
}
|
||||
reader = io.LimitReader(io.MultiReader(segments...), int64(end-start))
|
||||
reader = io.LimitReader(io.MultiReader(segments...), int64(end-start+1))
|
||||
} else if value, ok := obj.meta["X-Static-Large-Object"]; ok && value[0] == "True" && a.req.URL.Query().Get("multipart-manifest") != "get" {
|
||||
var segments []io.Reader
|
||||
var segmentList []segment
|
||||
json.Unmarshal(obj.data, &segmentList)
|
||||
cursor := 0
|
||||
size := 0
|
||||
sum := md5.New()
|
||||
for _, segment := range segmentList {
|
||||
components := strings.SplitN(segment.Name[1:], "/", 2)
|
||||
a.user.RLock()
|
||||
segContainer := a.user.Containers[components[0]]
|
||||
a.user.RUnlock()
|
||||
objectName := components[1]
|
||||
segObject := segContainer.objects[objectName]
|
||||
length := len(segObject.data)
|
||||
size += length
|
||||
sum.Write([]byte(hex.EncodeToString(segObject.checksum)))
|
||||
if start >= cursor+length {
|
||||
continue
|
||||
}
|
||||
segments = append(segments, bytes.NewReader(segObject.data[max(0, start-cursor):]))
|
||||
cursor += length
|
||||
}
|
||||
etag = sum.Sum(nil)
|
||||
if end == -1 {
|
||||
end = size - 1
|
||||
}
|
||||
reader = io.LimitReader(io.MultiReader(segments...), int64(end-start+1))
|
||||
} else {
|
||||
if end == -1 {
|
||||
end = len(obj.data)
|
||||
end = len(obj.data) - 1
|
||||
}
|
||||
etag = obj.checksum
|
||||
reader = bytes.NewReader(obj.data[start:end])
|
||||
reader = bytes.NewReader(obj.data[start : end+1])
|
||||
}
|
||||
|
||||
h.Set("Content-Length", fmt.Sprint(end-start))
|
||||
h.Set("ETag", hex.EncodeToString(etag))
|
||||
etagHex := hex.EncodeToString(etag)
|
||||
|
||||
if a.req.Header.Get("If-None-Match") == etagHex {
|
||||
a.w.WriteHeader(http.StatusNotModified)
|
||||
return nil
|
||||
}
|
||||
|
||||
h.Set("Content-Length", fmt.Sprint(end-start+1))
|
||||
h.Set("ETag", etagHex)
|
||||
h.Set("Last-Modified", obj.mtime.Format(http.TimeFormat))
|
||||
|
||||
if a.req.Method == "HEAD" {
|
||||
|
@ -514,10 +602,10 @@ func (objr objectResource) put(a *action) interface{} {
|
|||
meta: make(http.Header),
|
||||
},
|
||||
}
|
||||
a.user.Objects++
|
||||
atomic.AddInt64(&a.user.Objects, 1)
|
||||
} else {
|
||||
objr.container.bytes -= len(obj.data)
|
||||
a.user.BytesUsed -= int64(len(obj.data))
|
||||
atomic.AddInt64(&objr.container.bytes, -int64(len(obj.data)))
|
||||
atomic.AddInt64(&a.user.BytesUsed, -int64(len(obj.data)))
|
||||
}
|
||||
|
||||
var content_type string
|
||||
|
@ -528,15 +616,39 @@ func (objr objectResource) put(a *action) interface{} {
|
|||
}
|
||||
}
|
||||
|
||||
if a.req.URL.Query().Get("multipart-manifest") == "put" {
|
||||
// TODO: check the content of the SLO
|
||||
a.req.Header.Set("X-Static-Large-Object", "True")
|
||||
|
||||
var segments []segment
|
||||
json.Unmarshal(data, &segments)
|
||||
for i := range segments {
|
||||
segments[i].Name = "/" + segments[i].Path
|
||||
segments[i].Path = ""
|
||||
segments[i].Hash = segments[i].Etag
|
||||
segments[i].Etag = ""
|
||||
segments[i].Bytes = segments[i].Size
|
||||
segments[i].Size = 0
|
||||
}
|
||||
|
||||
data, _ = json.Marshal(segments)
|
||||
sum = md5.New()
|
||||
sum.Write(data)
|
||||
gotHash = sum.Sum(nil)
|
||||
}
|
||||
|
||||
// PUT request has been successful - save data and metadata
|
||||
obj.setMetadata(a, "object")
|
||||
obj.content_type = content_type
|
||||
obj.data = data
|
||||
obj.checksum = gotHash
|
||||
obj.mtime = time.Now().UTC()
|
||||
objr.container.Lock()
|
||||
objr.container.objects[objr.name] = obj
|
||||
objr.container.bytes += len(data)
|
||||
a.user.BytesUsed += int64(len(data))
|
||||
objr.container.bytes += int64(len(data))
|
||||
objr.container.Unlock()
|
||||
|
||||
atomic.AddInt64(&a.user.BytesUsed, int64(len(data)))
|
||||
|
||||
h := a.w.Header()
|
||||
h.Set("ETag", hex.EncodeToString(obj.checksum))
|
||||
|
@ -549,14 +661,25 @@ func (objr objectResource) delete(a *action) interface{} {
|
|||
fatalf(404, "NoSuchKey", "The specified key does not exist.")
|
||||
}
|
||||
|
||||
objr.container.bytes -= len(objr.object.data)
|
||||
a.user.BytesUsed -= int64(len(objr.object.data))
|
||||
objr.container.Lock()
|
||||
defer objr.container.Unlock()
|
||||
|
||||
objr.object.Lock()
|
||||
defer objr.object.Unlock()
|
||||
|
||||
objr.container.bytes -= int64(len(objr.object.data))
|
||||
delete(objr.container.objects, objr.name)
|
||||
a.user.Objects--
|
||||
|
||||
atomic.AddInt64(&a.user.BytesUsed, -int64(len(objr.object.data)))
|
||||
atomic.AddInt64(&a.user.Objects, -1)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (objr objectResource) post(a *action) interface{} {
|
||||
objr.object.Lock()
|
||||
defer objr.object.Unlock()
|
||||
|
||||
obj := objr.object
|
||||
obj.setMetadata(a, "object")
|
||||
return nil
|
||||
|
@ -568,6 +691,9 @@ func (objr objectResource) copy(a *action) interface{} {
|
|||
}
|
||||
|
||||
obj := objr.object
|
||||
obj.RLock()
|
||||
defer obj.RUnlock()
|
||||
|
||||
destination := a.req.Header.Get("Destination")
|
||||
if destination == "" {
|
||||
fatalf(400, "Bad Request", "You must provide a Destination header")
|
||||
|
@ -590,29 +716,38 @@ func (objr objectResource) copy(a *action) interface{} {
|
|||
meta: make(http.Header),
|
||||
},
|
||||
}
|
||||
a.user.Objects++
|
||||
atomic.AddInt64(&a.user.Objects, 1)
|
||||
} else {
|
||||
obj2 = objr2.object
|
||||
objr2.container.bytes -= len(obj2.data)
|
||||
a.user.BytesUsed -= int64(len(obj2.data))
|
||||
atomic.AddInt64(&objr2.container.bytes, -int64(len(obj2.data)))
|
||||
atomic.AddInt64(&a.user.BytesUsed, -int64(len(obj2.data)))
|
||||
}
|
||||
default:
|
||||
fatalf(400, "Bad Request", "Destination must point to a valid object path")
|
||||
}
|
||||
|
||||
if objr2.container.name != objr2.container.name && obj2.name != obj.name {
|
||||
obj2.Lock()
|
||||
defer obj2.Unlock()
|
||||
}
|
||||
|
||||
obj2.content_type = obj.content_type
|
||||
obj2.data = obj.data
|
||||
obj2.checksum = obj.checksum
|
||||
obj2.mtime = time.Now()
|
||||
objr2.container.objects[objr2.name] = obj2
|
||||
objr2.container.bytes += len(obj.data)
|
||||
a.user.BytesUsed += int64(len(obj.data))
|
||||
|
||||
for key, values := range obj.metadata.meta {
|
||||
obj2.metadata.meta[key] = values
|
||||
}
|
||||
obj2.setMetadata(a, "object")
|
||||
|
||||
objr2.container.Lock()
|
||||
objr2.container.objects[objr2.name] = obj2
|
||||
objr2.container.bytes += int64(len(obj.data))
|
||||
objr2.container.Unlock()
|
||||
|
||||
atomic.AddInt64(&a.user.BytesUsed, int64(len(obj.data)))
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -620,8 +755,14 @@ func (s *SwiftServer) serveHTTP(w http.ResponseWriter, req *http.Request) {
|
|||
// ignore error from ParseForm as it's usually spurious.
|
||||
req.ParseForm()
|
||||
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
if fn := s.override[req.URL.Path]; fn != nil {
|
||||
originalRW := w
|
||||
recorder := httptest.NewRecorder()
|
||||
w = recorder
|
||||
defer func() {
|
||||
fn(originalRW, req, recorder)
|
||||
}()
|
||||
}
|
||||
|
||||
if DEBUG {
|
||||
log.Printf("swifttest %q %q", req.Method, req.URL)
|
||||
|
@ -630,9 +771,9 @@ func (s *SwiftServer) serveHTTP(w http.ResponseWriter, req *http.Request) {
|
|||
srv: s,
|
||||
w: w,
|
||||
req: req,
|
||||
reqId: fmt.Sprintf("%09X", s.reqId),
|
||||
reqId: fmt.Sprintf("%09X", atomic.LoadInt64(&s.reqId)),
|
||||
}
|
||||
s.reqId++
|
||||
atomic.AddInt64(&s.reqId, 1)
|
||||
|
||||
var r resource
|
||||
defer func() {
|
||||
|
@ -651,6 +792,8 @@ func (s *SwiftServer) serveHTTP(w http.ResponseWriter, req *http.Request) {
|
|||
if req.URL.String() == "/v1.0" {
|
||||
username := req.Header.Get("x-auth-user")
|
||||
key := req.Header.Get("x-auth-key")
|
||||
s.Lock()
|
||||
defer s.Unlock()
|
||||
if acct, ok := s.Accounts[username]; ok {
|
||||
if acct.password == key {
|
||||
r := make([]byte, 16)
|
||||
|
@ -676,6 +819,11 @@ func (s *SwiftServer) serveHTTP(w http.ResponseWriter, req *http.Request) {
|
|||
"tempurl": map[string]interface{}{
|
||||
"methods": []string{"GET", "HEAD", "PUT"},
|
||||
},
|
||||
"slo": map[string]interface{}{
|
||||
"max_manifest_segments": 1000,
|
||||
"max_manifest_size": 2097152,
|
||||
"min_segment_size": 1,
|
||||
},
|
||||
})
|
||||
return
|
||||
}
|
||||
|
@ -688,9 +836,11 @@ func (s *SwiftServer) serveHTTP(w http.ResponseWriter, req *http.Request) {
|
|||
if key == "" && signature != "" && expires != "" {
|
||||
accountName, _, _, _ := s.parseURL(req.URL)
|
||||
secretKey := ""
|
||||
s.RLock()
|
||||
if account, ok := s.Accounts[accountName]; ok {
|
||||
secretKey = account.meta.Get("X-Account-Meta-Temp-Url-Key")
|
||||
}
|
||||
s.RUnlock()
|
||||
|
||||
get_hmac := func(method string) string {
|
||||
mac := hmac.New(sha1.New, []byte(secretKey))
|
||||
|
@ -707,12 +857,16 @@ func (s *SwiftServer) serveHTTP(w http.ResponseWriter, req *http.Request) {
|
|||
panic(notAuthorized())
|
||||
}
|
||||
} else {
|
||||
s.RLock()
|
||||
session, ok := s.Sessions[key[7:]]
|
||||
if !ok {
|
||||
s.RUnlock()
|
||||
panic(notAuthorized())
|
||||
return
|
||||
}
|
||||
|
||||
a.user = s.Accounts[session.username]
|
||||
s.RUnlock()
|
||||
}
|
||||
|
||||
switch req.Method {
|
||||
|
@ -746,6 +900,14 @@ func (s *SwiftServer) serveHTTP(w http.ResponseWriter, req *http.Request) {
|
|||
}
|
||||
}
|
||||
|
||||
func (s *SwiftServer) SetOverride(path string, fn HandlerOverrideFunc) {
|
||||
s.override[path] = fn
|
||||
}
|
||||
|
||||
func (s *SwiftServer) UnsetOverride(path string) {
|
||||
delete(s.override, path)
|
||||
}
|
||||
|
||||
func jsonMarshal(w io.Writer, x interface{}) {
|
||||
if err := json.NewEncoder(w).Encode(x); err != nil {
|
||||
panic(fmt.Errorf("error marshalling %#v: %v", x, err))
|
||||
|
@ -773,14 +935,21 @@ func (srv *SwiftServer) resourceForURL(u *url.URL) (r resource) {
|
|||
fatalf(404, "InvalidURI", err.Error())
|
||||
}
|
||||
|
||||
srv.RLock()
|
||||
account, ok := srv.Accounts[accountName]
|
||||
if !ok {
|
||||
//srv.RUnlock()
|
||||
fatalf(404, "NoSuchAccount", "The specified account does not exist")
|
||||
}
|
||||
srv.RUnlock()
|
||||
|
||||
account.RLock()
|
||||
if containerName == "" {
|
||||
account.RUnlock()
|
||||
return rootResource{}
|
||||
}
|
||||
account.RUnlock()
|
||||
|
||||
b := containerResource{
|
||||
name: containerName,
|
||||
container: account.Containers[containerName],
|
||||
|
@ -800,6 +969,8 @@ func (srv *SwiftServer) resourceForURL(u *url.URL) (r resource) {
|
|||
container: b.container,
|
||||
}
|
||||
|
||||
objr.container.RLock()
|
||||
defer objr.container.RUnlock()
|
||||
if obj := objr.container.objects[objr.name]; obj != nil {
|
||||
objr.object = obj
|
||||
}
|
||||
|
@ -835,9 +1006,12 @@ func (rootResource) get(a *action) interface{} {
|
|||
|
||||
h := a.w.Header()
|
||||
|
||||
h.Set("X-Account-Bytes-Used", strconv.Itoa(int(a.user.BytesUsed)))
|
||||
h.Set("X-Account-Container-Count", strconv.Itoa(int(a.user.Account.Containers)))
|
||||
h.Set("X-Account-Object-Count", strconv.Itoa(int(a.user.Objects)))
|
||||
h.Set("X-Account-Bytes-Used", strconv.Itoa(int(atomic.LoadInt64(&a.user.BytesUsed))))
|
||||
h.Set("X-Account-Container-Count", strconv.Itoa(int(atomic.LoadInt64(&a.user.Account.Containers))))
|
||||
h.Set("X-Account-Object-Count", strconv.Itoa(int(atomic.LoadInt64(&a.user.Objects))))
|
||||
|
||||
a.user.RLock()
|
||||
defer a.user.RUnlock()
|
||||
|
||||
// add metadata
|
||||
a.user.metadata.getMetadata(a)
|
||||
|
@ -862,7 +1036,7 @@ func (rootResource) get(a *action) interface{} {
|
|||
}
|
||||
if format == "json" {
|
||||
resp = append(resp, Folder{
|
||||
Count: len(container.objects),
|
||||
Count: int64(len(container.objects)),
|
||||
Bytes: container.bytes,
|
||||
Name: container.name,
|
||||
})
|
||||
|
@ -879,7 +1053,9 @@ func (rootResource) get(a *action) interface{} {
|
|||
}
|
||||
|
||||
func (r rootResource) post(a *action) interface{} {
|
||||
a.user.Lock()
|
||||
a.user.metadata.setMetadata(a, "account")
|
||||
a.user.Unlock()
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -894,21 +1070,10 @@ func (rootResource) delete(a *action) interface{} {
|
|||
func (rootResource) copy(a *action) interface{} { return notAllowed() }
|
||||
|
||||
func NewSwiftServer(address string) (*SwiftServer, error) {
|
||||
var (
|
||||
l net.Listener
|
||||
err error
|
||||
)
|
||||
if strings.Index(address, ":") == -1 {
|
||||
for port := 1024; port < 65535; port++ {
|
||||
addr := fmt.Sprintf("%s:%d", address, port)
|
||||
if l, err = net.Listen("tcp", addr); err == nil {
|
||||
address = addr
|
||||
break
|
||||
}
|
||||
}
|
||||
} else {
|
||||
l, err = net.Listen("tcp", address)
|
||||
address += ":0"
|
||||
}
|
||||
l, err := net.Listen("tcp", address)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cannot listen on %s: %v", address, err)
|
||||
}
|
||||
|
@ -919,6 +1084,7 @@ func NewSwiftServer(address string) (*SwiftServer, error) {
|
|||
URL: "http://" + l.Addr().String() + "/v1",
|
||||
Accounts: make(map[string]*account),
|
||||
Sessions: make(map[string]*session),
|
||||
override: make(map[string]HandlerOverrideFunc),
|
||||
}
|
||||
|
||||
server.Accounts[TEST_ACCOUNT] = &account{
|
||||
|
|
4
vendor/github.com/ncw/swift/timeout_reader.go
generated
vendored
4
vendor/github.com/ncw/swift/timeout_reader.go
generated
vendored
|
@ -38,10 +38,12 @@ func (t *timeoutReader) Read(p []byte) (int, error) {
|
|||
done <- result{n, err}
|
||||
}()
|
||||
// Wait for the read or the timeout
|
||||
timer := time.NewTimer(t.timeout)
|
||||
defer timer.Stop()
|
||||
select {
|
||||
case r := <-done:
|
||||
return r.n, r.err
|
||||
case <-time.After(t.timeout):
|
||||
case <-timer.C:
|
||||
t.cancel()
|
||||
return 0, TimeoutError
|
||||
}
|
||||
|
|
43
vendor/github.com/ncw/swift/watchdog_reader.go
generated
vendored
43
vendor/github.com/ncw/swift/watchdog_reader.go
generated
vendored
|
@ -5,29 +5,50 @@ import (
|
|||
"time"
|
||||
)
|
||||
|
||||
var watchdogChunkSize = 1 << 20 // 1 MiB
|
||||
|
||||
// An io.Reader which resets a watchdog timer whenever data is read
|
||||
type watchdogReader struct {
|
||||
timeout time.Duration
|
||||
reader io.Reader
|
||||
timer *time.Timer
|
||||
timeout time.Duration
|
||||
reader io.Reader
|
||||
timer *time.Timer
|
||||
chunkSize int
|
||||
}
|
||||
|
||||
// Returns a new reader which will kick the watchdog timer whenever data is read
|
||||
func newWatchdogReader(reader io.Reader, timeout time.Duration, timer *time.Timer) *watchdogReader {
|
||||
return &watchdogReader{
|
||||
timeout: timeout,
|
||||
reader: reader,
|
||||
timer: timer,
|
||||
timeout: timeout,
|
||||
reader: reader,
|
||||
timer: timer,
|
||||
chunkSize: watchdogChunkSize,
|
||||
}
|
||||
}
|
||||
|
||||
// Read reads up to len(p) bytes into p
|
||||
func (t *watchdogReader) Read(p []byte) (n int, err error) {
|
||||
// FIXME limit the amount of data read in one chunk so as to not exceed the timeout?
|
||||
func (t *watchdogReader) Read(p []byte) (int, error) {
|
||||
//read from underlying reader in chunks not larger than t.chunkSize
|
||||
//while resetting the watchdog timer before every read; the small chunk
|
||||
//size ensures that the timer does not fire when reading a large amount of
|
||||
//data from a slow connection
|
||||
start := 0
|
||||
end := len(p)
|
||||
for start < end {
|
||||
length := end - start
|
||||
if length > t.chunkSize {
|
||||
length = t.chunkSize
|
||||
}
|
||||
|
||||
resetTimer(t.timer, t.timeout)
|
||||
n, err := t.reader.Read(p[start : start+length])
|
||||
start += n
|
||||
if n == 0 || err != nil {
|
||||
return start, err
|
||||
}
|
||||
}
|
||||
|
||||
resetTimer(t.timer, t.timeout)
|
||||
n, err = t.reader.Read(p)
|
||||
resetTimer(t.timer, t.timeout)
|
||||
return
|
||||
return start, nil
|
||||
}
|
||||
|
||||
// Check it satisfies the interface
|
||||
|
|
Loading…
Reference in a new issue