[#163] Fix zip streaming

Signed-off-by: Denis Kirillov <denis@nspcc.ru>
This commit is contained in:
Denis Kirillov 2022-06-17 12:17:02 +03:00 committed by Alex Vanin
parent 0a0af13bea
commit 85ee0c44a2
2 changed files with 76 additions and 71 deletions

View file

@ -2,6 +2,7 @@ package downloader
import ( import (
"archive/zip" "archive/zip"
"bufio"
"bytes" "bytes"
"context" "context"
"errors" "errors"
@ -383,6 +384,19 @@ func (d *Downloader) search(c *fasthttp.RequestCtx, cid *cid.ID, key, val string
return d.pool.SearchObjects(d.appCtx, prm) return d.pool.SearchObjects(d.appCtx, prm)
} }
func (d *Downloader) addObjectToZip(zw *zip.Writer, obj *object.Object) (io.Writer, error) {
method := zip.Store
if d.settings.ZipCompression {
method = zip.Deflate
}
return zw.CreateHeader(&zip.FileHeader{
Name: getZipFilePath(obj),
Method: method,
Modified: time.Now(),
})
}
// DownloadZipped handles zip by prefix requests. // DownloadZipped handles zip by prefix requests.
func (d *Downloader) DownloadZipped(c *fasthttp.RequestCtx) { func (d *Downloader) DownloadZipped(c *fasthttp.RequestCtx) {
scid, _ := c.UserValue("cid").(string) scid, _ := c.UserValue("cid").(string)
@ -409,99 +423,90 @@ func (d *Downloader) DownloadZipped(c *fasthttp.RequestCtx) {
return return
} }
defer resSearch.Close()
c.Response.Header.Set(fasthttp.HeaderContentType, "application/zip") c.Response.Header.Set(fasthttp.HeaderContentType, "application/zip")
c.Response.Header.Set(fasthttp.HeaderContentDisposition, "attachment; filename=\"archive.zip\"") c.Response.Header.Set(fasthttp.HeaderContentDisposition, "attachment; filename=\"archive.zip\"")
c.Response.SetStatusCode(http.StatusOK) c.Response.SetStatusCode(http.StatusOK)
zipWriter := zip.NewWriter(c) c.SetBodyStreamWriter(func(w *bufio.Writer) {
compression := zip.Store defer resSearch.Close()
if d.settings.ZipCompression {
compression = zip.Deflate
}
var ( zipWriter := zip.NewWriter(w)
addr address.Address
resGet *pool.ResGetObject
w io.Writer
bufZip []byte
)
addr.SetContainerID(*containerID) var bufZip []byte
var addr address.Address
btoken := bearerToken(c) empty := true
empty := true called := false
called := false btoken := bearerToken(c)
addr.SetContainerID(*containerID)
errIter := resSearch.Iterate(func(id oid.ID) bool { errIter := resSearch.Iterate(func(id oid.ID) bool {
called = true called = true
if empty { if empty {
bufZip = make([]byte, 1024) // configure? bufZip = make([]byte, 3<<20) // the same as for upload
} }
empty = false
empty = false addr.SetObjectID(id)
if err = d.zipObject(zipWriter, addr, btoken, bufZip); err != nil {
return true
}
addr.SetObjectID(id) return false
var prm pool.PrmObjectGet
prm.SetAddress(addr)
if btoken != nil {
prm.UseBearer(*btoken)
}
resGet, err = d.pool.GetObject(d.appCtx, prm)
if err != nil {
err = fmt.Errorf("get NeoFS object: %v", err)
return true
}
w, err = zipWriter.CreateHeader(&zip.FileHeader{
Name: getZipFilePath(&resGet.Header),
Method: compression,
Modified: time.Now(),
}) })
if err != nil { if errIter != nil {
err = fmt.Errorf("zip create header: %v", err) log.Error("iterating over selected objects failed", zap.Error(errIter))
return true response.Error(c, "iterating over selected objects: "+errIter.Error(), fasthttp.StatusBadRequest)
return
} else if !called {
log.Error("objects not found")
response.Error(c, "objects not found", fasthttp.StatusNotFound)
return
} }
_, err = io.CopyBuffer(w, resGet.Payload, bufZip) if err == nil {
if err != nil { err = zipWriter.Close()
err = fmt.Errorf("copy object payload to zip file: %v", err)
return true
} }
_ = resGet.Payload.Close()
err = zipWriter.Flush()
if err != nil { if err != nil {
err = fmt.Errorf("flush zip writer: %v", err) log.Error("file streaming failure", zap.Error(err))
return true response.Error(c, "file streaming failure: "+err.Error(), fasthttp.StatusInternalServerError)
return
} }
return false
}) })
if errIter != nil { }
log.Error("iterating over selected objects failed", zap.Error(errIter))
response.Error(c, "iterating over selected objects: "+errIter.Error(), fasthttp.StatusBadRequest) func (d *Downloader) zipObject(zipWriter *zip.Writer, addr address.Address, btoken *bearer.Token, bufZip []byte) error {
return var prm pool.PrmObjectGet
} else if !called { prm.SetAddress(addr)
log.Error("objects not found") if btoken != nil {
response.Error(c, "objects not found", fasthttp.StatusNotFound) prm.UseBearer(*btoken)
return
}
if err == nil {
err = zipWriter.Close()
} }
resGet, err := d.pool.GetObject(d.appCtx, prm)
if err != nil { if err != nil {
log.Error("file streaming failure", zap.Error(err)) return fmt.Errorf("get NeoFS object: %v", err)
response.Error(c, "file streaming failure: "+err.Error(), fasthttp.StatusInternalServerError)
return
} }
objWriter, err := d.addObjectToZip(zipWriter, &resGet.Header)
if err != nil {
return fmt.Errorf("zip create header: %v", err)
}
if _, err = io.CopyBuffer(objWriter, resGet.Payload, bufZip); err != nil {
return fmt.Errorf("copy object payload to zip file: %v", err)
}
if err = resGet.Payload.Close(); err != nil {
return fmt.Errorf("object body close error: %w", err)
}
if err = zipWriter.Flush(); err != nil {
return fmt.Errorf("flush zip writer: %v", err)
}
return nil
} }
func getZipFilePath(obj *object.Object) string { func getZipFilePath(obj *object.Object) string {

View file

@ -276,7 +276,7 @@ func makeZipRequest(t *testing.T, url string, names, contents []string) {
data, err := io.ReadAll(resp.Body) data, err := io.ReadAll(resp.Body)
require.NoError(t, err) require.NoError(t, err)
checkZip(t, data, resp.ContentLength, names, contents) checkZip(t, data, int64(len(data)), names, contents)
} }
func checkZip(t *testing.T, data []byte, length int64, names, contents []string) { func checkZip(t *testing.T, data []byte, length int64, names, contents []string) {