[#126] downloader: Replace Read with Iterate on ObjectListReader

Make `Downloader.DownloadZipped` to call `Iterate` method instead of
`Read` one during processing the `ObjectListReader`.

Signed-off-by: Leonard Lyubich <leonard@nspcc.ru>
This commit is contained in:
Leonard Lyubich 2022-03-01 23:38:03 +03:00 committed by LeL
parent 2b7e4a36fb
commit c482bbd25a
4 changed files with 56 additions and 65 deletions

View file

@ -404,29 +404,10 @@ func (d *Downloader) DownloadZipped(c *fasthttp.RequestCtx) {
optBearer := bearerOpts(c)
empty := true
n := 0
buf := make([]oid.ID, 10) // configure?
called := false
iterator:
for {
n, err = resSearch.Read(buf)
if err != nil {
if errors.Is(err, io.EOF) {
if empty {
log.Error("objects not found", zap.Error(err))
response.Error(c, "objects not found", fasthttp.StatusNotFound)
return
}
err = nil
break
}
log.Error("read object list failed", zap.Error(err))
response.Error(c, "read object list failed", fasthttp.StatusBadRequest) // maybe best effort?
return
}
errIter := resSearch.Iterate(func(id oid.ID) bool {
called = true
if empty {
bufZip = make([]byte, 1024) // configure?
@ -434,39 +415,48 @@ iterator:
empty = false
for i := range buf[:n] {
addr.SetObjectID(&buf[i])
addr.SetObjectID(&id)
resGet, err = d.pool.GetObject(c, addr, optBearer)
if err != nil {
err = fmt.Errorf("get NeoFS object: %v", err)
break iterator
}
w, err = zipWriter.CreateHeader(&zip.FileHeader{
Name: getFilename(&resGet.Header),
Method: compression,
Modified: time.Now(),
})
if err != nil {
err = fmt.Errorf("zip create header: %v", err)
break iterator
}
_, err = io.CopyBuffer(w, resGet.Payload, bufZip)
if err != nil {
err = fmt.Errorf("copy object payload to zip file: %v", err)
break iterator
}
_ = resGet.Payload.Close()
err = zipWriter.Flush()
if err != nil {
err = fmt.Errorf("flush zip writer: %v", err)
break iterator
}
resGet, err = d.pool.GetObject(c, addr, optBearer)
if err != nil {
err = fmt.Errorf("get NeoFS object: %v", err)
return true
}
w, err = zipWriter.CreateHeader(&zip.FileHeader{
Name: getFilename(&resGet.Header),
Method: compression,
Modified: time.Now(),
})
if err != nil {
err = fmt.Errorf("zip create header: %v", err)
return true
}
_, err = io.CopyBuffer(w, resGet.Payload, bufZip)
if err != nil {
err = fmt.Errorf("copy object payload to zip file: %v", err)
return true
}
_ = resGet.Payload.Close()
err = zipWriter.Flush()
if err != nil {
err = fmt.Errorf("flush zip writer: %v", err)
return true
}
return false
})
if errIter != nil {
log.Error("iterating over selected objects failed", zap.Error(errIter))
response.Error(c, "iterating over selected objects", fasthttp.StatusBadRequest)
return
} else if !called {
log.Error("objects not found")
response.Error(c, "objects not found", fasthttp.StatusNotFound)
return
}
if err == nil {

4
go.mod
View file

@ -11,8 +11,8 @@ require (
github.com/hashicorp/golang-lru v0.5.5-0.20210104140557-80c98217689d // indirect
github.com/klauspost/compress v1.13.1 // indirect
github.com/nspcc-dev/neo-go v0.98.0
github.com/nspcc-dev/neofs-api-go/v2 v2.11.2-0.20220127135316-32dd0bb3f9c5
github.com/nspcc-dev/neofs-sdk-go v1.0.0-rc.1.0.20220221122137-66bc59da5c02
github.com/nspcc-dev/neofs-api-go/v2 v2.12.0
github.com/nspcc-dev/neofs-sdk-go v1.0.0-rc.1.0.20220228071935-07817fb4032d
github.com/prometheus/client_golang v1.11.0
github.com/prometheus/common v0.29.0
github.com/prometheus/procfs v0.7.1 // indirect

8
go.sum
View file

@ -589,15 +589,15 @@ github.com/nspcc-dev/neo-go v0.73.1-pre.0.20200303142215-f5a1b928ce09/go.mod h1:
github.com/nspcc-dev/neo-go v0.98.0 h1:yyW4sgY88/pLf0949qmgfkQXzRKC3CI/WyhqXNnwMd8=
github.com/nspcc-dev/neo-go v0.98.0/go.mod h1:E3cc1x6RXSXrJb2nDWXTXjnXk3rIqVN8YdFyWv+FrqM=
github.com/nspcc-dev/neofs-api-go/v2 v2.11.0-pre.0.20211201134523-3604d96f3fe1/go.mod h1:oS8dycEh8PPf2Jjp6+8dlwWyEv2Dy77h/XhhcdxYEFs=
github.com/nspcc-dev/neofs-api-go/v2 v2.11.2-0.20220127135316-32dd0bb3f9c5 h1:y9tbmUYhcr052QXsa4/IfUKAi2cx3TGDsEZUAow3P/Y=
github.com/nspcc-dev/neofs-api-go/v2 v2.11.2-0.20220127135316-32dd0bb3f9c5/go.mod h1:oS8dycEh8PPf2Jjp6+8dlwWyEv2Dy77h/XhhcdxYEFs=
github.com/nspcc-dev/neofs-api-go/v2 v2.12.0 h1:xWqXzorDk9WFMTtWP7cwwlyJDL1X6Z4HT1e5zqkq7xY=
github.com/nspcc-dev/neofs-api-go/v2 v2.12.0/go.mod h1:oS8dycEh8PPf2Jjp6+8dlwWyEv2Dy77h/XhhcdxYEFs=
github.com/nspcc-dev/neofs-crypto v0.2.0/go.mod h1:F/96fUzPM3wR+UGsPi3faVNmFlA9KAEAUQR7dMxZmNA=
github.com/nspcc-dev/neofs-crypto v0.2.3/go.mod h1:8w16GEJbH6791ktVqHN9YRNH3s9BEEKYxGhlFnp0cDw=
github.com/nspcc-dev/neofs-crypto v0.3.0 h1:zlr3pgoxuzrmGCxc5W8dGVfA9Rro8diFvVnBg0L4ifM=
github.com/nspcc-dev/neofs-crypto v0.3.0/go.mod h1:8w16GEJbH6791ktVqHN9YRNH3s9BEEKYxGhlFnp0cDw=
github.com/nspcc-dev/neofs-sdk-go v0.0.0-20211201182451-a5b61c4f6477/go.mod h1:dfMtQWmBHYpl9Dez23TGtIUKiFvCIxUZq/CkSIhEpz4=
github.com/nspcc-dev/neofs-sdk-go v1.0.0-rc.1.0.20220221122137-66bc59da5c02 h1:g9tIrZU45dVFUSiY7Bb8m43rV/CJiIoPgQrxnbtKfKE=
github.com/nspcc-dev/neofs-sdk-go v1.0.0-rc.1.0.20220221122137-66bc59da5c02/go.mod h1:NeDPJaKJ6yCOWXRmfc3aRrhBPEOeAPD7q/6bp1UQCbs=
github.com/nspcc-dev/neofs-sdk-go v1.0.0-rc.1.0.20220228071935-07817fb4032d h1:ku9s0XJ2LoWbB6nUjkyP7M8ki2nLOlMIvi4fAocf+iY=
github.com/nspcc-dev/neofs-sdk-go v1.0.0-rc.1.0.20220228071935-07817fb4032d/go.mod h1:/WV31AQHs6YLTjMgMjMZw8Z3/Q7b6kMjNgJVsRab5AU=
github.com/nspcc-dev/rfc6979 v0.1.0/go.mod h1:exhIh1PdpDC5vQmyEsGvc4YDM/lyQp/452QxGq/UEso=
github.com/nspcc-dev/rfc6979 v0.2.0 h1:3e1WNxrN60/6N0DW7+UYisLeZJyfqZTNOjeV/toYvOE=
github.com/nspcc-dev/rfc6979 v0.2.0/go.mod h1:exhIh1PdpDC5vQmyEsGvc4YDM/lyQp/452QxGq/UEso=

View file

@ -7,7 +7,6 @@ import (
"encoding/json"
"fmt"
"io"
"math"
"mime/multipart"
"net/http"
"sort"
@ -47,7 +46,8 @@ func TestIntegration(t *testing.T) {
aioContainer := createDockerContainer(ctx, t, aioImage+version)
cancel := runServer()
clientPool := getPool(ctx, t, key)
CID := createContainer(ctx, t, clientPool)
CID, err := createContainer(ctx, t, clientPool)
require.NoError(t, err, version)
t.Run("simple put "+version, func(t *testing.T) { simplePut(ctx, t, clientPool, CID) })
t.Run("simple get "+version, func(t *testing.T) { simpleGet(ctx, t, clientPool, CID) })
@ -285,7 +285,7 @@ func getPool(ctx context.Context, t *testing.T, key *keys.PrivateKey) pool.Pool
return clientPool
}
func createContainer(ctx context.Context, t *testing.T, clientPool pool.Pool) *cid.ID {
func createContainer(ctx context.Context, t *testing.T, clientPool pool.Pool) (*cid.ID, error) {
pp, err := policy.Parse("REP 1")
require.NoError(t, err)
@ -297,16 +297,17 @@ func createContainer(ctx context.Context, t *testing.T, clientPool pool.Pool) *c
cnr.SetOwnerID(clientPool.OwnerID())
CID, err := clientPool.PutContainer(ctx, cnr)
require.NoError(t, err)
if err != nil {
return nil, err
}
fmt.Println(CID.String())
err = clientPool.WaitForContainerPresence(ctx, CID, &pool.ContainerPollingParams{
CreationTimeout: 15 * time.Second,
PollInterval: 3 * time.Second,
})
require.NoError(t, err)
return CID
return CID, err
}
func putObject(ctx context.Context, t *testing.T, clientPool pool.Pool, CID *cid.ID, content string, attributes map[string]string) *oid.ID {