forked from TrueCloudLab/frostfs-http-gw
[#67] Added streaming to download
Signed-off-by: Denis Kirillov <denis@nspcc.ru>
This commit is contained in:
parent
222a434e5c
commit
5eed0fd22d
4 changed files with 117 additions and 16 deletions
|
@ -9,7 +9,6 @@ import (
|
|||
"path"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/nspcc-dev/neofs-api-go/pkg/client"
|
||||
|
@ -25,9 +24,11 @@ import (
|
|||
|
||||
type (
|
||||
detector struct {
|
||||
io.Writer
|
||||
sync.Once
|
||||
io.Reader
|
||||
err error
|
||||
contentType string
|
||||
done chan struct{}
|
||||
data []byte
|
||||
}
|
||||
|
||||
request struct {
|
||||
|
@ -36,17 +37,57 @@ type (
|
|||
}
|
||||
|
||||
objectIDs []*object.ID
|
||||
|
||||
errReader struct {
|
||||
data []byte
|
||||
err error
|
||||
offset int
|
||||
}
|
||||
)
|
||||
|
||||
func newDetector(w io.Writer) *detector {
|
||||
return &detector{Writer: w}
|
||||
func newReader(data []byte, err error) *errReader {
|
||||
return &errReader{data: data, err: err}
|
||||
}
|
||||
|
||||
func (d *detector) Write(data []byte) (int, error) {
|
||||
d.Once.Do(func() {
|
||||
d.contentType = http.DetectContentType(data)
|
||||
})
|
||||
return d.Writer.Write(data)
|
||||
func (r *errReader) Read(b []byte) (int, error) {
|
||||
if r.offset >= len(r.data) {
|
||||
return 0, io.EOF
|
||||
}
|
||||
n := copy(b, r.data[r.offset:])
|
||||
r.offset += n
|
||||
if r.offset >= len(r.data) {
|
||||
return n, r.err
|
||||
}
|
||||
return n, nil
|
||||
}
|
||||
|
||||
const contentTypeDetectSize = 512
|
||||
|
||||
func newDetector() *detector {
|
||||
return &detector{done: make(chan struct{}), data: make([]byte, contentTypeDetectSize)}
|
||||
}
|
||||
|
||||
func (d *detector) Wait() {
|
||||
<-d.done
|
||||
}
|
||||
|
||||
func (d *detector) SetReader(reader io.Reader) {
|
||||
d.Reader = reader
|
||||
}
|
||||
|
||||
func (d *detector) Detect() {
|
||||
n, err := d.Reader.Read(d.data)
|
||||
if err != nil && err != io.EOF {
|
||||
d.err = err
|
||||
return
|
||||
}
|
||||
d.data = d.data[:n]
|
||||
d.contentType = http.DetectContentType(d.data)
|
||||
close(d.done)
|
||||
}
|
||||
|
||||
func (d *detector) MultiReader() io.Reader {
|
||||
return io.MultiReader(newReader(d.data, d.err), d.Reader)
|
||||
}
|
||||
|
||||
func isValidToken(s string) bool {
|
||||
|
@ -84,10 +125,13 @@ func (r *request) receiveFile(clnt client.Object, objectAddress *object.Address)
|
|||
r.Error("could not fetch and store bearer token", fasthttp.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
writer := newDetector(r.Response.BodyWriter())
|
||||
readDetector := newDetector()
|
||||
options := new(client.GetObjectParams).
|
||||
WithAddress(objectAddress).
|
||||
WithPayloadWriter(writer)
|
||||
WithPayloadReaderHandler(func(reader io.Reader) {
|
||||
readDetector.SetReader(reader)
|
||||
readDetector.Detect()
|
||||
})
|
||||
|
||||
obj, err = clnt.GetObject(
|
||||
r.RequestCtx,
|
||||
|
@ -119,7 +163,9 @@ func (r *request) receiveFile(clnt client.Object, objectAddress *object.Address)
|
|||
if r.Request.URI().QueryArgs().GetBool("download") {
|
||||
dis = "attachment"
|
||||
}
|
||||
r.Response.SetBodyStream(readDetector.MultiReader(), int(obj.PayloadSize()))
|
||||
r.Response.Header.Set("Content-Length", strconv.FormatUint(obj.PayloadSize(), 10))
|
||||
var contentType string
|
||||
for _, attr := range obj.Attributes() {
|
||||
key := attr.Key()
|
||||
val := attr.Value()
|
||||
|
@ -142,13 +188,24 @@ func (r *request) receiveFile(clnt client.Object, objectAddress *object.Address)
|
|||
r.Response.Header.Set("Last-Modified",
|
||||
time.Unix(value, 0).Format(time.RFC1123))
|
||||
case object.AttributeContentType:
|
||||
writer.contentType = val
|
||||
contentType = val
|
||||
}
|
||||
}
|
||||
r.Response.Header.Set("x-object-id", obj.ID().String())
|
||||
r.Response.Header.Set("x-owner-id", obj.OwnerID().String())
|
||||
r.Response.Header.Set("x-container-id", obj.ContainerID().String())
|
||||
r.SetContentType(writer.contentType)
|
||||
|
||||
if len(contentType) == 0 {
|
||||
if readDetector.err != nil {
|
||||
r.log.Error("could not read object", zap.Error(err))
|
||||
r.Error("could not read object", fasthttp.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
readDetector.Wait()
|
||||
contentType = readDetector.contentType
|
||||
}
|
||||
r.SetContentType(contentType)
|
||||
|
||||
r.Response.Header.Set("Content-Disposition", dis+"; filename="+path.Base(filename))
|
||||
}
|
||||
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue