From c482bbd25a91684ac827b7da0b9991dbf0e09617 Mon Sep 17 00:00:00 2001 From: Leonard Lyubich Date: Tue, 1 Mar 2022 23:38:03 +0300 Subject: [PATCH] [#126] downloader: Replace `Read` with `Iterate` on `ObjectListReader` Make `Downloader.DownloadZipped` to call `Iterate` method instead of `Read` one during processing the `ObjectListReader`. Signed-off-by: Leonard Lyubich --- downloader/download.go | 96 +++++++++++++++++++----------------------- go.mod | 4 +- go.sum | 8 ++-- integration_test.go | 13 +++--- 4 files changed, 56 insertions(+), 65 deletions(-) diff --git a/downloader/download.go b/downloader/download.go index 89ebb43..dd810f1 100644 --- a/downloader/download.go +++ b/downloader/download.go @@ -404,29 +404,10 @@ func (d *Downloader) DownloadZipped(c *fasthttp.RequestCtx) { optBearer := bearerOpts(c) empty := true - n := 0 - buf := make([]oid.ID, 10) // configure? + called := false -iterator: - for { - n, err = resSearch.Read(buf) - if err != nil { - if errors.Is(err, io.EOF) { - if empty { - log.Error("objects not found", zap.Error(err)) - response.Error(c, "objects not found", fasthttp.StatusNotFound) - return - } - - err = nil - - break - } - - log.Error("read object list failed", zap.Error(err)) - response.Error(c, "read object list failed", fasthttp.StatusBadRequest) // maybe best effort? - return - } + errIter := resSearch.Iterate(func(id oid.ID) bool { + called = true if empty { bufZip = make([]byte, 1024) // configure? @@ -434,39 +415,48 @@ iterator: empty = false - for i := range buf[:n] { - addr.SetObjectID(&buf[i]) + addr.SetObjectID(&id) - resGet, err = d.pool.GetObject(c, addr, optBearer) - if err != nil { - err = fmt.Errorf("get NeoFS object: %v", err) - break iterator - } - - w, err = zipWriter.CreateHeader(&zip.FileHeader{ - Name: getFilename(&resGet.Header), - Method: compression, - Modified: time.Now(), - }) - if err != nil { - err = fmt.Errorf("zip create header: %v", err) - break iterator - } - - _, err = io.CopyBuffer(w, resGet.Payload, bufZip) - if err != nil { - err = fmt.Errorf("copy object payload to zip file: %v", err) - break iterator - } - - _ = resGet.Payload.Close() - - err = zipWriter.Flush() - if err != nil { - err = fmt.Errorf("flush zip writer: %v", err) - break iterator - } + resGet, err = d.pool.GetObject(c, addr, optBearer) + if err != nil { + err = fmt.Errorf("get NeoFS object: %v", err) + return true } + + w, err = zipWriter.CreateHeader(&zip.FileHeader{ + Name: getFilename(&resGet.Header), + Method: compression, + Modified: time.Now(), + }) + if err != nil { + err = fmt.Errorf("zip create header: %v", err) + return true + } + + _, err = io.CopyBuffer(w, resGet.Payload, bufZip) + if err != nil { + err = fmt.Errorf("copy object payload to zip file: %v", err) + return true + } + + _ = resGet.Payload.Close() + + err = zipWriter.Flush() + if err != nil { + err = fmt.Errorf("flush zip writer: %v", err) + return true + } + + return false + }) + if errIter != nil { + log.Error("iterating over selected objects failed", zap.Error(errIter)) + response.Error(c, "iterating over selected objects", fasthttp.StatusBadRequest) + return + } else if !called { + log.Error("objects not found") + response.Error(c, "objects not found", fasthttp.StatusNotFound) + return } if err == nil { diff --git a/go.mod b/go.mod index 33310c6..f838189 100644 --- a/go.mod +++ b/go.mod @@ -11,8 +11,8 @@ require ( github.com/hashicorp/golang-lru v0.5.5-0.20210104140557-80c98217689d // indirect github.com/klauspost/compress v1.13.1 // indirect github.com/nspcc-dev/neo-go v0.98.0 - github.com/nspcc-dev/neofs-api-go/v2 v2.11.2-0.20220127135316-32dd0bb3f9c5 - github.com/nspcc-dev/neofs-sdk-go v1.0.0-rc.1.0.20220221122137-66bc59da5c02 + github.com/nspcc-dev/neofs-api-go/v2 v2.12.0 + github.com/nspcc-dev/neofs-sdk-go v1.0.0-rc.1.0.20220228071935-07817fb4032d github.com/prometheus/client_golang v1.11.0 github.com/prometheus/common v0.29.0 github.com/prometheus/procfs v0.7.1 // indirect diff --git a/go.sum b/go.sum index f90fd90..051c241 100644 --- a/go.sum +++ b/go.sum @@ -589,15 +589,15 @@ github.com/nspcc-dev/neo-go v0.73.1-pre.0.20200303142215-f5a1b928ce09/go.mod h1: github.com/nspcc-dev/neo-go v0.98.0 h1:yyW4sgY88/pLf0949qmgfkQXzRKC3CI/WyhqXNnwMd8= github.com/nspcc-dev/neo-go v0.98.0/go.mod h1:E3cc1x6RXSXrJb2nDWXTXjnXk3rIqVN8YdFyWv+FrqM= github.com/nspcc-dev/neofs-api-go/v2 v2.11.0-pre.0.20211201134523-3604d96f3fe1/go.mod h1:oS8dycEh8PPf2Jjp6+8dlwWyEv2Dy77h/XhhcdxYEFs= -github.com/nspcc-dev/neofs-api-go/v2 v2.11.2-0.20220127135316-32dd0bb3f9c5 h1:y9tbmUYhcr052QXsa4/IfUKAi2cx3TGDsEZUAow3P/Y= -github.com/nspcc-dev/neofs-api-go/v2 v2.11.2-0.20220127135316-32dd0bb3f9c5/go.mod h1:oS8dycEh8PPf2Jjp6+8dlwWyEv2Dy77h/XhhcdxYEFs= +github.com/nspcc-dev/neofs-api-go/v2 v2.12.0 h1:xWqXzorDk9WFMTtWP7cwwlyJDL1X6Z4HT1e5zqkq7xY= +github.com/nspcc-dev/neofs-api-go/v2 v2.12.0/go.mod h1:oS8dycEh8PPf2Jjp6+8dlwWyEv2Dy77h/XhhcdxYEFs= github.com/nspcc-dev/neofs-crypto v0.2.0/go.mod h1:F/96fUzPM3wR+UGsPi3faVNmFlA9KAEAUQR7dMxZmNA= github.com/nspcc-dev/neofs-crypto v0.2.3/go.mod h1:8w16GEJbH6791ktVqHN9YRNH3s9BEEKYxGhlFnp0cDw= github.com/nspcc-dev/neofs-crypto v0.3.0 h1:zlr3pgoxuzrmGCxc5W8dGVfA9Rro8diFvVnBg0L4ifM= github.com/nspcc-dev/neofs-crypto v0.3.0/go.mod h1:8w16GEJbH6791ktVqHN9YRNH3s9BEEKYxGhlFnp0cDw= github.com/nspcc-dev/neofs-sdk-go v0.0.0-20211201182451-a5b61c4f6477/go.mod h1:dfMtQWmBHYpl9Dez23TGtIUKiFvCIxUZq/CkSIhEpz4= -github.com/nspcc-dev/neofs-sdk-go v1.0.0-rc.1.0.20220221122137-66bc59da5c02 h1:g9tIrZU45dVFUSiY7Bb8m43rV/CJiIoPgQrxnbtKfKE= -github.com/nspcc-dev/neofs-sdk-go v1.0.0-rc.1.0.20220221122137-66bc59da5c02/go.mod h1:NeDPJaKJ6yCOWXRmfc3aRrhBPEOeAPD7q/6bp1UQCbs= +github.com/nspcc-dev/neofs-sdk-go v1.0.0-rc.1.0.20220228071935-07817fb4032d h1:ku9s0XJ2LoWbB6nUjkyP7M8ki2nLOlMIvi4fAocf+iY= +github.com/nspcc-dev/neofs-sdk-go v1.0.0-rc.1.0.20220228071935-07817fb4032d/go.mod h1:/WV31AQHs6YLTjMgMjMZw8Z3/Q7b6kMjNgJVsRab5AU= github.com/nspcc-dev/rfc6979 v0.1.0/go.mod h1:exhIh1PdpDC5vQmyEsGvc4YDM/lyQp/452QxGq/UEso= github.com/nspcc-dev/rfc6979 v0.2.0 h1:3e1WNxrN60/6N0DW7+UYisLeZJyfqZTNOjeV/toYvOE= github.com/nspcc-dev/rfc6979 v0.2.0/go.mod h1:exhIh1PdpDC5vQmyEsGvc4YDM/lyQp/452QxGq/UEso= diff --git a/integration_test.go b/integration_test.go index 4866fc7..bac23bf 100644 --- a/integration_test.go +++ b/integration_test.go @@ -7,7 +7,6 @@ import ( "encoding/json" "fmt" "io" - "math" "mime/multipart" "net/http" "sort" @@ -47,7 +46,8 @@ func TestIntegration(t *testing.T) { aioContainer := createDockerContainer(ctx, t, aioImage+version) cancel := runServer() clientPool := getPool(ctx, t, key) - CID := createContainer(ctx, t, clientPool) + CID, err := createContainer(ctx, t, clientPool) + require.NoError(t, err, version) t.Run("simple put "+version, func(t *testing.T) { simplePut(ctx, t, clientPool, CID) }) t.Run("simple get "+version, func(t *testing.T) { simpleGet(ctx, t, clientPool, CID) }) @@ -285,7 +285,7 @@ func getPool(ctx context.Context, t *testing.T, key *keys.PrivateKey) pool.Pool return clientPool } -func createContainer(ctx context.Context, t *testing.T, clientPool pool.Pool) *cid.ID { +func createContainer(ctx context.Context, t *testing.T, clientPool pool.Pool) (*cid.ID, error) { pp, err := policy.Parse("REP 1") require.NoError(t, err) @@ -297,16 +297,17 @@ func createContainer(ctx context.Context, t *testing.T, clientPool pool.Pool) *c cnr.SetOwnerID(clientPool.OwnerID()) CID, err := clientPool.PutContainer(ctx, cnr) - require.NoError(t, err) + if err != nil { + return nil, err + } fmt.Println(CID.String()) err = clientPool.WaitForContainerPresence(ctx, CID, &pool.ContainerPollingParams{ CreationTimeout: 15 * time.Second, PollInterval: 3 * time.Second, }) - require.NoError(t, err) - return CID + return CID, err } func putObject(ctx context.Context, t *testing.T, clientPool pool.Pool, CID *cid.ID, content string, attributes map[string]string) *oid.ID {