Merge pull request #72 from KirillovDenis/feature/67-download_streaming
[#67] Added streaming to download
This commit is contained in:
commit
4f3620fdff
4 changed files with 117 additions and 16 deletions
|
@ -9,7 +9,6 @@ import (
|
||||||
"path"
|
"path"
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/nspcc-dev/neofs-api-go/pkg/client"
|
"github.com/nspcc-dev/neofs-api-go/pkg/client"
|
||||||
|
@ -25,9 +24,11 @@ import (
|
||||||
|
|
||||||
type (
|
type (
|
||||||
detector struct {
|
detector struct {
|
||||||
io.Writer
|
io.Reader
|
||||||
sync.Once
|
err error
|
||||||
contentType string
|
contentType string
|
||||||
|
done chan struct{}
|
||||||
|
data []byte
|
||||||
}
|
}
|
||||||
|
|
||||||
request struct {
|
request struct {
|
||||||
|
@ -36,17 +37,57 @@ type (
|
||||||
}
|
}
|
||||||
|
|
||||||
objectIDs []*object.ID
|
objectIDs []*object.ID
|
||||||
|
|
||||||
|
errReader struct {
|
||||||
|
data []byte
|
||||||
|
err error
|
||||||
|
offset int
|
||||||
|
}
|
||||||
)
|
)
|
||||||
|
|
||||||
func newDetector(w io.Writer) *detector {
|
func newReader(data []byte, err error) *errReader {
|
||||||
return &detector{Writer: w}
|
return &errReader{data: data, err: err}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *detector) Write(data []byte) (int, error) {
|
func (r *errReader) Read(b []byte) (int, error) {
|
||||||
d.Once.Do(func() {
|
if r.offset >= len(r.data) {
|
||||||
d.contentType = http.DetectContentType(data)
|
return 0, io.EOF
|
||||||
})
|
}
|
||||||
return d.Writer.Write(data)
|
n := copy(b, r.data[r.offset:])
|
||||||
|
r.offset += n
|
||||||
|
if r.offset >= len(r.data) {
|
||||||
|
return n, r.err
|
||||||
|
}
|
||||||
|
return n, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
const contentTypeDetectSize = 512
|
||||||
|
|
||||||
|
func newDetector() *detector {
|
||||||
|
return &detector{done: make(chan struct{}), data: make([]byte, contentTypeDetectSize)}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *detector) Wait() {
|
||||||
|
<-d.done
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *detector) SetReader(reader io.Reader) {
|
||||||
|
d.Reader = reader
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *detector) Detect() {
|
||||||
|
n, err := d.Reader.Read(d.data)
|
||||||
|
if err != nil && err != io.EOF {
|
||||||
|
d.err = err
|
||||||
|
return
|
||||||
|
}
|
||||||
|
d.data = d.data[:n]
|
||||||
|
d.contentType = http.DetectContentType(d.data)
|
||||||
|
close(d.done)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *detector) MultiReader() io.Reader {
|
||||||
|
return io.MultiReader(newReader(d.data, d.err), d.Reader)
|
||||||
}
|
}
|
||||||
|
|
||||||
func isValidToken(s string) bool {
|
func isValidToken(s string) bool {
|
||||||
|
@ -84,10 +125,13 @@ func (r *request) receiveFile(clnt client.Object, objectAddress *object.Address)
|
||||||
r.Error("could not fetch and store bearer token", fasthttp.StatusBadRequest)
|
r.Error("could not fetch and store bearer token", fasthttp.StatusBadRequest)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
writer := newDetector(r.Response.BodyWriter())
|
readDetector := newDetector()
|
||||||
options := new(client.GetObjectParams).
|
options := new(client.GetObjectParams).
|
||||||
WithAddress(objectAddress).
|
WithAddress(objectAddress).
|
||||||
WithPayloadWriter(writer)
|
WithPayloadReaderHandler(func(reader io.Reader) {
|
||||||
|
readDetector.SetReader(reader)
|
||||||
|
readDetector.Detect()
|
||||||
|
})
|
||||||
|
|
||||||
obj, err = clnt.GetObject(
|
obj, err = clnt.GetObject(
|
||||||
r.RequestCtx,
|
r.RequestCtx,
|
||||||
|
@ -119,7 +163,9 @@ func (r *request) receiveFile(clnt client.Object, objectAddress *object.Address)
|
||||||
if r.Request.URI().QueryArgs().GetBool("download") {
|
if r.Request.URI().QueryArgs().GetBool("download") {
|
||||||
dis = "attachment"
|
dis = "attachment"
|
||||||
}
|
}
|
||||||
|
r.Response.SetBodyStream(readDetector.MultiReader(), int(obj.PayloadSize()))
|
||||||
r.Response.Header.Set("Content-Length", strconv.FormatUint(obj.PayloadSize(), 10))
|
r.Response.Header.Set("Content-Length", strconv.FormatUint(obj.PayloadSize(), 10))
|
||||||
|
var contentType string
|
||||||
for _, attr := range obj.Attributes() {
|
for _, attr := range obj.Attributes() {
|
||||||
key := attr.Key()
|
key := attr.Key()
|
||||||
val := attr.Value()
|
val := attr.Value()
|
||||||
|
@ -142,13 +188,24 @@ func (r *request) receiveFile(clnt client.Object, objectAddress *object.Address)
|
||||||
r.Response.Header.Set("Last-Modified",
|
r.Response.Header.Set("Last-Modified",
|
||||||
time.Unix(value, 0).Format(time.RFC1123))
|
time.Unix(value, 0).Format(time.RFC1123))
|
||||||
case object.AttributeContentType:
|
case object.AttributeContentType:
|
||||||
writer.contentType = val
|
contentType = val
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
r.Response.Header.Set("x-object-id", obj.ID().String())
|
r.Response.Header.Set("x-object-id", obj.ID().String())
|
||||||
r.Response.Header.Set("x-owner-id", obj.OwnerID().String())
|
r.Response.Header.Set("x-owner-id", obj.OwnerID().String())
|
||||||
r.Response.Header.Set("x-container-id", obj.ContainerID().String())
|
r.Response.Header.Set("x-container-id", obj.ContainerID().String())
|
||||||
r.SetContentType(writer.contentType)
|
|
||||||
|
if len(contentType) == 0 {
|
||||||
|
if readDetector.err != nil {
|
||||||
|
r.log.Error("could not read object", zap.Error(err))
|
||||||
|
r.Error("could not read object", fasthttp.StatusBadRequest)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
readDetector.Wait()
|
||||||
|
contentType = readDetector.contentType
|
||||||
|
}
|
||||||
|
r.SetContentType(contentType)
|
||||||
|
|
||||||
r.Response.Header.Set("Content-Disposition", dis+"; filename="+path.Base(filename))
|
r.Response.Header.Set("Content-Disposition", dis+"; filename="+path.Base(filename))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
43
downloader/reader_test.go
Normal file
43
downloader/reader_test.go
Normal file
|
@ -0,0 +1,43 @@
|
||||||
|
package downloader
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"io"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestReader(t *testing.T) {
|
||||||
|
data := []byte("test string")
|
||||||
|
err := fmt.Errorf("something wrong")
|
||||||
|
|
||||||
|
for _, tc := range []struct {
|
||||||
|
err error
|
||||||
|
buff []byte
|
||||||
|
}{
|
||||||
|
{err: nil, buff: make([]byte, len(data)+1)},
|
||||||
|
{err: nil, buff: make([]byte, len(data))},
|
||||||
|
{err: nil, buff: make([]byte, len(data)-1)},
|
||||||
|
{err: err, buff: make([]byte, len(data)+1)},
|
||||||
|
{err: err, buff: make([]byte, len(data))},
|
||||||
|
{err: err, buff: make([]byte, len(data)-1)},
|
||||||
|
} {
|
||||||
|
var res []byte
|
||||||
|
var err error
|
||||||
|
var n int
|
||||||
|
|
||||||
|
r := newReader(data, tc.err)
|
||||||
|
for err == nil {
|
||||||
|
n, err = r.Read(tc.buff)
|
||||||
|
res = append(res, tc.buff[:n]...)
|
||||||
|
}
|
||||||
|
|
||||||
|
if tc.err == nil {
|
||||||
|
require.Equal(t, io.EOF, err)
|
||||||
|
} else {
|
||||||
|
require.Equal(t, tc.err, err)
|
||||||
|
}
|
||||||
|
require.Equal(t, data, res)
|
||||||
|
}
|
||||||
|
}
|
2
go.mod
2
go.mod
|
@ -6,7 +6,7 @@ require (
|
||||||
github.com/fasthttp/router v1.3.5
|
github.com/fasthttp/router v1.3.5
|
||||||
github.com/mr-tron/base58 v1.1.3 // indirect
|
github.com/mr-tron/base58 v1.1.3 // indirect
|
||||||
github.com/nspcc-dev/neo-go v0.95.3
|
github.com/nspcc-dev/neo-go v0.95.3
|
||||||
github.com/nspcc-dev/neofs-api-go v1.27.1
|
github.com/nspcc-dev/neofs-api-go v1.27.2-0.20210623111558-6d531a07a53d
|
||||||
github.com/nspcc-dev/neofs-sdk-go v0.0.0-20210615074944-86a9aa92599b
|
github.com/nspcc-dev/neofs-sdk-go v0.0.0-20210615074944-86a9aa92599b
|
||||||
github.com/prometheus/client_golang v1.9.0
|
github.com/prometheus/client_golang v1.9.0
|
||||||
github.com/prometheus/common v0.15.0
|
github.com/prometheus/common v0.15.0
|
||||||
|
|
3
go.sum
3
go.sum
|
@ -329,8 +329,9 @@ github.com/nspcc-dev/neo-go v0.95.3 h1:RxBKcmmatbSM2cETGhv3ritmrkU0gUnWItNZvtrBt
|
||||||
github.com/nspcc-dev/neo-go v0.95.3/go.mod h1:t15xRFDVhz5o/pstptdoW9N9JJBNn1hZ6APMNiC6MrY=
|
github.com/nspcc-dev/neo-go v0.95.3/go.mod h1:t15xRFDVhz5o/pstptdoW9N9JJBNn1hZ6APMNiC6MrY=
|
||||||
github.com/nspcc-dev/neofs-api-go v1.24.0/go.mod h1:G7dqincfdjBrAbL5nxVp82emF05fSVEqe59ICsoRDI8=
|
github.com/nspcc-dev/neofs-api-go v1.24.0/go.mod h1:G7dqincfdjBrAbL5nxVp82emF05fSVEqe59ICsoRDI8=
|
||||||
github.com/nspcc-dev/neofs-api-go v1.27.0/go.mod h1:i0Cwgvcu9A4M4e58pydbXFisUhSxpfljmuWFPIp2btE=
|
github.com/nspcc-dev/neofs-api-go v1.27.0/go.mod h1:i0Cwgvcu9A4M4e58pydbXFisUhSxpfljmuWFPIp2btE=
|
||||||
github.com/nspcc-dev/neofs-api-go v1.27.1 h1:ONdKOnm0/hK6m38VTUliCHY6RTxg+IpAzY4G+BeOZG4=
|
|
||||||
github.com/nspcc-dev/neofs-api-go v1.27.1/go.mod h1:i0Cwgvcu9A4M4e58pydbXFisUhSxpfljmuWFPIp2btE=
|
github.com/nspcc-dev/neofs-api-go v1.27.1/go.mod h1:i0Cwgvcu9A4M4e58pydbXFisUhSxpfljmuWFPIp2btE=
|
||||||
|
github.com/nspcc-dev/neofs-api-go v1.27.2-0.20210623111558-6d531a07a53d h1:e8F3ijxrAJSf2jgJKFtgp/nay4Tx5EpZ0vFiKmDfosI=
|
||||||
|
github.com/nspcc-dev/neofs-api-go v1.27.2-0.20210623111558-6d531a07a53d/go.mod h1:i0Cwgvcu9A4M4e58pydbXFisUhSxpfljmuWFPIp2btE=
|
||||||
github.com/nspcc-dev/neofs-crypto v0.2.0/go.mod h1:F/96fUzPM3wR+UGsPi3faVNmFlA9KAEAUQR7dMxZmNA=
|
github.com/nspcc-dev/neofs-crypto v0.2.0/go.mod h1:F/96fUzPM3wR+UGsPi3faVNmFlA9KAEAUQR7dMxZmNA=
|
||||||
github.com/nspcc-dev/neofs-crypto v0.2.3/go.mod h1:8w16GEJbH6791ktVqHN9YRNH3s9BEEKYxGhlFnp0cDw=
|
github.com/nspcc-dev/neofs-crypto v0.2.3/go.mod h1:8w16GEJbH6791ktVqHN9YRNH3s9BEEKYxGhlFnp0cDw=
|
||||||
github.com/nspcc-dev/neofs-crypto v0.3.0 h1:zlr3pgoxuzrmGCxc5W8dGVfA9Rro8diFvVnBg0L4ifM=
|
github.com/nspcc-dev/neofs-crypto v0.3.0 h1:zlr3pgoxuzrmGCxc5W8dGVfA9Rro8diFvVnBg0L4ifM=
|
||||||
|
|
Loading…
Reference in a new issue