From 85ee0c44a27c358e0726de5a6d3f6c984d54722c Mon Sep 17 00:00:00 2001 From: Denis Kirillov Date: Fri, 17 Jun 2022 12:17:02 +0300 Subject: [PATCH] [#163] Fix zip streaming Signed-off-by: Denis Kirillov --- downloader/download.go | 145 +++++++++++++++++++++-------------------- integration_test.go | 2 +- 2 files changed, 76 insertions(+), 71 deletions(-) diff --git a/downloader/download.go b/downloader/download.go index 5456f91..21726f4 100644 --- a/downloader/download.go +++ b/downloader/download.go @@ -2,6 +2,7 @@ package downloader import ( "archive/zip" + "bufio" "bytes" "context" "errors" @@ -383,6 +384,19 @@ func (d *Downloader) search(c *fasthttp.RequestCtx, cid *cid.ID, key, val string return d.pool.SearchObjects(d.appCtx, prm) } +func (d *Downloader) addObjectToZip(zw *zip.Writer, obj *object.Object) (io.Writer, error) { + method := zip.Store + if d.settings.ZipCompression { + method = zip.Deflate + } + + return zw.CreateHeader(&zip.FileHeader{ + Name: getZipFilePath(obj), + Method: method, + Modified: time.Now(), + }) +} + // DownloadZipped handles zip by prefix requests. func (d *Downloader) DownloadZipped(c *fasthttp.RequestCtx) { scid, _ := c.UserValue("cid").(string) @@ -409,99 +423,90 @@ func (d *Downloader) DownloadZipped(c *fasthttp.RequestCtx) { return } - defer resSearch.Close() - c.Response.Header.Set(fasthttp.HeaderContentType, "application/zip") c.Response.Header.Set(fasthttp.HeaderContentDisposition, "attachment; filename=\"archive.zip\"") c.Response.SetStatusCode(http.StatusOK) - zipWriter := zip.NewWriter(c) - compression := zip.Store - if d.settings.ZipCompression { - compression = zip.Deflate - } + c.SetBodyStreamWriter(func(w *bufio.Writer) { + defer resSearch.Close() - var ( - addr address.Address - resGet *pool.ResGetObject - w io.Writer - bufZip []byte - ) + zipWriter := zip.NewWriter(w) - addr.SetContainerID(*containerID) + var bufZip []byte + var addr address.Address - btoken := bearerToken(c) - empty := true - called := false + empty := true + called := false + btoken := bearerToken(c) + addr.SetContainerID(*containerID) - errIter := resSearch.Iterate(func(id oid.ID) bool { - called = true + errIter := resSearch.Iterate(func(id oid.ID) bool { + called = true - if empty { - bufZip = make([]byte, 1024) // configure? - } + if empty { + bufZip = make([]byte, 3<<20) // the same as for upload + } + empty = false - empty = false + addr.SetObjectID(id) + if err = d.zipObject(zipWriter, addr, btoken, bufZip); err != nil { + return true + } - addr.SetObjectID(id) - - var prm pool.PrmObjectGet - prm.SetAddress(addr) - if btoken != nil { - prm.UseBearer(*btoken) - } - - resGet, err = d.pool.GetObject(d.appCtx, prm) - if err != nil { - err = fmt.Errorf("get NeoFS object: %v", err) - return true - } - - w, err = zipWriter.CreateHeader(&zip.FileHeader{ - Name: getZipFilePath(&resGet.Header), - Method: compression, - Modified: time.Now(), + return false }) - if err != nil { - err = fmt.Errorf("zip create header: %v", err) - return true + if errIter != nil { + log.Error("iterating over selected objects failed", zap.Error(errIter)) + response.Error(c, "iterating over selected objects: "+errIter.Error(), fasthttp.StatusBadRequest) + return + } else if !called { + log.Error("objects not found") + response.Error(c, "objects not found", fasthttp.StatusNotFound) + return } - _, err = io.CopyBuffer(w, resGet.Payload, bufZip) - if err != nil { - err = fmt.Errorf("copy object payload to zip file: %v", err) - return true + if err == nil { + err = zipWriter.Close() } - _ = resGet.Payload.Close() - - err = zipWriter.Flush() if err != nil { - err = fmt.Errorf("flush zip writer: %v", err) - return true + log.Error("file streaming failure", zap.Error(err)) + response.Error(c, "file streaming failure: "+err.Error(), fasthttp.StatusInternalServerError) + return } - - return false }) - if errIter != nil { - log.Error("iterating over selected objects failed", zap.Error(errIter)) - response.Error(c, "iterating over selected objects: "+errIter.Error(), fasthttp.StatusBadRequest) - return - } else if !called { - log.Error("objects not found") - response.Error(c, "objects not found", fasthttp.StatusNotFound) - return - } - - if err == nil { - err = zipWriter.Close() +} + +func (d *Downloader) zipObject(zipWriter *zip.Writer, addr address.Address, btoken *bearer.Token, bufZip []byte) error { + var prm pool.PrmObjectGet + prm.SetAddress(addr) + if btoken != nil { + prm.UseBearer(*btoken) } + resGet, err := d.pool.GetObject(d.appCtx, prm) if err != nil { - log.Error("file streaming failure", zap.Error(err)) - response.Error(c, "file streaming failure: "+err.Error(), fasthttp.StatusInternalServerError) - return + return fmt.Errorf("get NeoFS object: %v", err) } + + objWriter, err := d.addObjectToZip(zipWriter, &resGet.Header) + if err != nil { + return fmt.Errorf("zip create header: %v", err) + } + + if _, err = io.CopyBuffer(objWriter, resGet.Payload, bufZip); err != nil { + return fmt.Errorf("copy object payload to zip file: %v", err) + } + + if err = resGet.Payload.Close(); err != nil { + return fmt.Errorf("object body close error: %w", err) + } + + if err = zipWriter.Flush(); err != nil { + return fmt.Errorf("flush zip writer: %v", err) + } + + return nil } func getZipFilePath(obj *object.Object) string { diff --git a/integration_test.go b/integration_test.go index 413e8e2..ecb85bf 100644 --- a/integration_test.go +++ b/integration_test.go @@ -276,7 +276,7 @@ func makeZipRequest(t *testing.T, url string, names, contents []string) { data, err := io.ReadAll(resp.Body) require.NoError(t, err) - checkZip(t, data, resp.ContentLength, names, contents) + checkZip(t, data, int64(len(data)), names, contents) } func checkZip(t *testing.T, data []byte, length int64, names, contents []string) {