swift: implement streaming uploads (see #1614)

This commit is contained in:
Stefan Breunig 2017-09-14 07:42:16 +02:00
parent 56dedc49e3
commit 9d22f4208f

View file

@ -2,6 +2,7 @@
package swift package swift
import ( import (
"bufio"
"bytes" "bytes"
"fmt" "fmt"
"io" "io"
@ -473,6 +474,11 @@ func (f *Fs) Put(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.
return fs, fs.Update(in, src, options...) return fs, fs.Update(in, src, options...)
} }
// PutStream uploads to the remote path with the modTime given of indeterminate size
func (f *Fs) PutStream(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) {
return f.Put(in, src, options...)
}
// Mkdir creates the container if it doesn't exist // Mkdir creates the container if it doesn't exist
func (f *Fs) Mkdir(dir string) error { func (f *Fs) Mkdir(dir string) error {
f.containerOKMu.Lock() f.containerOKMu.Lock()
@ -779,7 +785,7 @@ func urlEncode(str string) string {
// updateChunks updates the existing object using chunks to a separate // updateChunks updates the existing object using chunks to a separate
// container. It returns a string which prefixes current segments. // container. It returns a string which prefixes current segments.
func (o *Object) updateChunks(in io.Reader, headers swift.Headers, size int64, contentType string) (string, error) { func (o *Object) updateChunks(in0 io.Reader, headers swift.Headers, size int64, contentType string) (string, error) {
// Create the segmentsContainer if it doesn't exist // Create the segmentsContainer if it doesn't exist
err := o.fs.c.ContainerCreate(o.fs.segmentsContainer, nil) err := o.fs.c.ContainerCreate(o.fs.segmentsContainer, nil)
if err != nil { if err != nil {
@ -790,9 +796,22 @@ func (o *Object) updateChunks(in io.Reader, headers swift.Headers, size int64, c
i := 0 i := 0
uniquePrefix := fmt.Sprintf("%s/%d", swift.TimeToFloatString(time.Now()), size) uniquePrefix := fmt.Sprintf("%s/%d", swift.TimeToFloatString(time.Now()), size)
segmentsPath := fmt.Sprintf("%s%s/%s", o.fs.root, o.remote, uniquePrefix) segmentsPath := fmt.Sprintf("%s%s/%s", o.fs.root, o.remote, uniquePrefix)
for left > 0 { in := bufio.NewReader(in0)
n := min(left, int64(chunkSize)) for {
// can we read at least one byte?
if _, err := in.Peek(1); err != nil {
if left > 0 {
return "", err // read less than expected
}
fs.Debugf(o, "Uploading segments into %q seems done (%v)", o.fs.segmentsContainer, err)
break
}
n := int64(chunkSize)
if size != -1 {
n = min(left, n)
headers["Content-Length"] = strconv.FormatInt(n, 10) // set Content-Length as we know it headers["Content-Length"] = strconv.FormatInt(n, 10) // set Content-Length as we know it
left -= n
}
segmentReader := io.LimitReader(in, n) segmentReader := io.LimitReader(in, n)
segmentPath := fmt.Sprintf("%s/%08d", segmentsPath, i) segmentPath := fmt.Sprintf("%s/%08d", segmentsPath, i)
fs.Debugf(o, "Uploading segment file %q into %q", segmentPath, o.fs.segmentsContainer) fs.Debugf(o, "Uploading segment file %q into %q", segmentPath, o.fs.segmentsContainer)
@ -800,7 +819,6 @@ func (o *Object) updateChunks(in io.Reader, headers swift.Headers, size int64, c
if err != nil { if err != nil {
return "", err return "", err
} }
left -= n
i++ i++
} }
// Upload the manifest // Upload the manifest
@ -838,7 +856,7 @@ func (o *Object) Update(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOptio
contentType := fs.MimeType(src) contentType := fs.MimeType(src)
headers := m.ObjectHeaders() headers := m.ObjectHeaders()
uniquePrefix := "" uniquePrefix := ""
if size > int64(chunkSize) { if size > int64(chunkSize) || size == -1 {
uniquePrefix, err = o.updateChunks(in, headers, size, contentType) uniquePrefix, err = o.updateChunks(in, headers, size, contentType)
if err != nil { if err != nil {
return err return err
@ -894,6 +912,7 @@ func (o *Object) MimeType() string {
var ( var (
_ fs.Fs = &Fs{} _ fs.Fs = &Fs{}
_ fs.Purger = &Fs{} _ fs.Purger = &Fs{}
_ fs.PutStreamer = &Fs{}
_ fs.Copier = &Fs{} _ fs.Copier = &Fs{}
_ fs.ListRer = &Fs{} _ fs.ListRer = &Fs{}
_ fs.Object = &Object{} _ fs.Object = &Object{}