From 5eed0fd22d9f951cf84ee99ac47dab128be8daa5 Mon Sep 17 00:00:00 2001 From: Denis Kirillov Date: Wed, 23 Jun 2021 13:51:53 +0300 Subject: [PATCH] [#67] Added streaming to download Signed-off-by: Denis Kirillov --- downloader/download.go | 85 ++++++++++++++++++++++++++++++++------- downloader/reader_test.go | 43 ++++++++++++++++++++ go.mod | 2 +- go.sum | 3 +- 4 files changed, 117 insertions(+), 16 deletions(-) create mode 100644 downloader/reader_test.go diff --git a/downloader/download.go b/downloader/download.go index b8ed12a..079244a 100644 --- a/downloader/download.go +++ b/downloader/download.go @@ -9,7 +9,6 @@ import ( "path" "strconv" "strings" - "sync" "time" "github.com/nspcc-dev/neofs-api-go/pkg/client" @@ -25,9 +24,11 @@ import ( type ( detector struct { - io.Writer - sync.Once + io.Reader + err error contentType string + done chan struct{} + data []byte } request struct { @@ -36,17 +37,57 @@ type ( } objectIDs []*object.ID + + errReader struct { + data []byte + err error + offset int + } ) -func newDetector(w io.Writer) *detector { - return &detector{Writer: w} +func newReader(data []byte, err error) *errReader { + return &errReader{data: data, err: err} } -func (d *detector) Write(data []byte) (int, error) { - d.Once.Do(func() { - d.contentType = http.DetectContentType(data) - }) - return d.Writer.Write(data) +func (r *errReader) Read(b []byte) (int, error) { + if r.offset >= len(r.data) { + return 0, io.EOF + } + n := copy(b, r.data[r.offset:]) + r.offset += n + if r.offset >= len(r.data) { + return n, r.err + } + return n, nil +} + +const contentTypeDetectSize = 512 + +func newDetector() *detector { + return &detector{done: make(chan struct{}), data: make([]byte, contentTypeDetectSize)} +} + +func (d *detector) Wait() { + <-d.done +} + +func (d *detector) SetReader(reader io.Reader) { + d.Reader = reader +} + +func (d *detector) Detect() { + n, err := d.Reader.Read(d.data) + if err != nil && err != io.EOF { + d.err = err + return + } + d.data = d.data[:n] + d.contentType = http.DetectContentType(d.data) + close(d.done) +} + +func (d *detector) MultiReader() io.Reader { + return io.MultiReader(newReader(d.data, d.err), d.Reader) } func isValidToken(s string) bool { @@ -84,10 +125,13 @@ func (r *request) receiveFile(clnt client.Object, objectAddress *object.Address) r.Error("could not fetch and store bearer token", fasthttp.StatusBadRequest) return } - writer := newDetector(r.Response.BodyWriter()) + readDetector := newDetector() options := new(client.GetObjectParams). WithAddress(objectAddress). - WithPayloadWriter(writer) + WithPayloadReaderHandler(func(reader io.Reader) { + readDetector.SetReader(reader) + readDetector.Detect() + }) obj, err = clnt.GetObject( r.RequestCtx, @@ -119,7 +163,9 @@ func (r *request) receiveFile(clnt client.Object, objectAddress *object.Address) if r.Request.URI().QueryArgs().GetBool("download") { dis = "attachment" } + r.Response.SetBodyStream(readDetector.MultiReader(), int(obj.PayloadSize())) r.Response.Header.Set("Content-Length", strconv.FormatUint(obj.PayloadSize(), 10)) + var contentType string for _, attr := range obj.Attributes() { key := attr.Key() val := attr.Value() @@ -142,13 +188,24 @@ func (r *request) receiveFile(clnt client.Object, objectAddress *object.Address) r.Response.Header.Set("Last-Modified", time.Unix(value, 0).Format(time.RFC1123)) case object.AttributeContentType: - writer.contentType = val + contentType = val } } r.Response.Header.Set("x-object-id", obj.ID().String()) r.Response.Header.Set("x-owner-id", obj.OwnerID().String()) r.Response.Header.Set("x-container-id", obj.ContainerID().String()) - r.SetContentType(writer.contentType) + + if len(contentType) == 0 { + if readDetector.err != nil { + r.log.Error("could not read object", zap.Error(err)) + r.Error("could not read object", fasthttp.StatusBadRequest) + return + } + readDetector.Wait() + contentType = readDetector.contentType + } + r.SetContentType(contentType) + r.Response.Header.Set("Content-Disposition", dis+"; filename="+path.Base(filename)) } diff --git a/downloader/reader_test.go b/downloader/reader_test.go new file mode 100644 index 0000000..a88795d --- /dev/null +++ b/downloader/reader_test.go @@ -0,0 +1,43 @@ +package downloader + +import ( + "fmt" + "io" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestReader(t *testing.T) { + data := []byte("test string") + err := fmt.Errorf("something wrong") + + for _, tc := range []struct { + err error + buff []byte + }{ + {err: nil, buff: make([]byte, len(data)+1)}, + {err: nil, buff: make([]byte, len(data))}, + {err: nil, buff: make([]byte, len(data)-1)}, + {err: err, buff: make([]byte, len(data)+1)}, + {err: err, buff: make([]byte, len(data))}, + {err: err, buff: make([]byte, len(data)-1)}, + } { + var res []byte + var err error + var n int + + r := newReader(data, tc.err) + for err == nil { + n, err = r.Read(tc.buff) + res = append(res, tc.buff[:n]...) + } + + if tc.err == nil { + require.Equal(t, io.EOF, err) + } else { + require.Equal(t, tc.err, err) + } + require.Equal(t, data, res) + } +} diff --git a/go.mod b/go.mod index 78d6162..4eb2e41 100644 --- a/go.mod +++ b/go.mod @@ -6,7 +6,7 @@ require ( github.com/fasthttp/router v1.3.5 github.com/mr-tron/base58 v1.1.3 // indirect github.com/nspcc-dev/neo-go v0.95.3 - github.com/nspcc-dev/neofs-api-go v1.27.1 + github.com/nspcc-dev/neofs-api-go v1.27.2-0.20210623111558-6d531a07a53d github.com/nspcc-dev/neofs-sdk-go v0.0.0-20210615074944-86a9aa92599b github.com/prometheus/client_golang v1.9.0 github.com/prometheus/common v0.15.0 diff --git a/go.sum b/go.sum index c053121..5f5a1be 100644 --- a/go.sum +++ b/go.sum @@ -329,8 +329,9 @@ github.com/nspcc-dev/neo-go v0.95.3 h1:RxBKcmmatbSM2cETGhv3ritmrkU0gUnWItNZvtrBt github.com/nspcc-dev/neo-go v0.95.3/go.mod h1:t15xRFDVhz5o/pstptdoW9N9JJBNn1hZ6APMNiC6MrY= github.com/nspcc-dev/neofs-api-go v1.24.0/go.mod h1:G7dqincfdjBrAbL5nxVp82emF05fSVEqe59ICsoRDI8= github.com/nspcc-dev/neofs-api-go v1.27.0/go.mod h1:i0Cwgvcu9A4M4e58pydbXFisUhSxpfljmuWFPIp2btE= -github.com/nspcc-dev/neofs-api-go v1.27.1 h1:ONdKOnm0/hK6m38VTUliCHY6RTxg+IpAzY4G+BeOZG4= github.com/nspcc-dev/neofs-api-go v1.27.1/go.mod h1:i0Cwgvcu9A4M4e58pydbXFisUhSxpfljmuWFPIp2btE= +github.com/nspcc-dev/neofs-api-go v1.27.2-0.20210623111558-6d531a07a53d h1:e8F3ijxrAJSf2jgJKFtgp/nay4Tx5EpZ0vFiKmDfosI= +github.com/nspcc-dev/neofs-api-go v1.27.2-0.20210623111558-6d531a07a53d/go.mod h1:i0Cwgvcu9A4M4e58pydbXFisUhSxpfljmuWFPIp2btE= github.com/nspcc-dev/neofs-crypto v0.2.0/go.mod h1:F/96fUzPM3wR+UGsPi3faVNmFlA9KAEAUQR7dMxZmNA= github.com/nspcc-dev/neofs-crypto v0.2.3/go.mod h1:8w16GEJbH6791ktVqHN9YRNH3s9BEEKYxGhlFnp0cDw= github.com/nspcc-dev/neofs-crypto v0.3.0 h1:zlr3pgoxuzrmGCxc5W8dGVfA9Rro8diFvVnBg0L4ifM=