From 5080b43a04c46b7e846bd68463a7dc9cc925f27d Mon Sep 17 00:00:00 2001 From: Denis Kirillov Date: Thu, 7 Apr 2022 15:56:18 +0300 Subject: [PATCH] [#139] Update SDK Signed-off-by: Denis Kirillov --- app.go | 31 ++++++++++++++++-------- downloader/download.go | 28 +++++++++++++++------ downloader/head.go | 16 +++++++++--- go.mod | 2 +- go.sum | 4 +-- integration_test.go | 42 ++++++++++++++++++++------------ uploader/upload.go | 55 +++++++++++++++++++++--------------------- 7 files changed, 111 insertions(+), 67 deletions(-) diff --git a/app.go b/app.go index 63334cd..487d356 100644 --- a/app.go +++ b/app.go @@ -9,6 +9,7 @@ import ( "github.com/fasthttp/router" "github.com/nspcc-dev/neo-go/cli/flags" "github.com/nspcc-dev/neo-go/cli/input" + "github.com/nspcc-dev/neo-go/pkg/crypto/keys" "github.com/nspcc-dev/neo-go/pkg/util" "github.com/nspcc-dev/neo-go/pkg/wallet" "github.com/nspcc-dev/neofs-http-gw/downloader" @@ -92,7 +93,13 @@ func newApp(ctx context.Context, opt ...Option) App { if err != nil { a.log.Fatal("failed to get neofs credentials", zap.Error(err)) } - pb := new(pool.Builder) + + var prm pool.InitParameters + prm.SetKey(key) + prm.SetNodeDialTimeout(a.cfg.GetDuration(cfgConTimeout)) + prm.SetHealthcheckTimeout(a.cfg.GetDuration(cfgReqTimeout)) + prm.SetClientRebalanceInterval(a.cfg.GetDuration(cfgRebalance)) + for i := 0; ; i++ { address := a.cfg.GetString(cfgPeers + "." + strconv.Itoa(i) + ".address") weight := a.cfg.GetFloat64(cfgPeers + "." + strconv.Itoa(i) + ".weight") @@ -106,20 +113,20 @@ func newApp(ctx context.Context, opt ...Option) App { if priority <= 0 { // unspecified or wrong priority = 1 } - pb.AddNode(address, priority, weight) + prm.AddNode(pool.NewNodeParam(priority, address, weight)) a.log.Info("add connection", zap.String("address", address), zap.Float64("weight", weight), zap.Int("priority", priority)) } - opts := &pool.BuilderOptions{ - Key: key, - NodeConnectionTimeout: a.cfg.GetDuration(cfgConTimeout), - NodeRequestTimeout: a.cfg.GetDuration(cfgReqTimeout), - ClientRebalanceInterval: a.cfg.GetDuration(cfgRebalance), - } - a.pool, err = pb.Build(ctx, opts) + + a.pool, err = pool.NewPool(prm) if err != nil { a.log.Fatal("failed to create connection pool", zap.Error(err)) } + + err = a.pool.Dial(ctx) + if err != nil { + a.log.Fatal("failed to dial pool", zap.Error(err)) + } return a } @@ -127,7 +134,11 @@ func getNeoFSKey(a *app) (*ecdsa.PrivateKey, error) { walletPath := a.cfg.GetString(cmdWallet) if len(walletPath) == 0 { a.log.Info("no wallet path specified, creating ephemeral key automatically for this run") - return pool.NewEphemeralKey() + key, err := keys.NewPrivateKey() + if err != nil { + return nil, err + } + return &key.PrivateKey, nil } w, err := wallet.NewWalletFromFile(walletPath) if err != nil { diff --git a/downloader/download.go b/downloader/download.go index 3599119..3221c45 100644 --- a/downloader/download.go +++ b/downloader/download.go @@ -22,6 +22,7 @@ import ( "github.com/nspcc-dev/neofs-sdk-go/object/address" oid "github.com/nspcc-dev/neofs-sdk-go/object/id" "github.com/nspcc-dev/neofs-sdk-go/pool" + "github.com/nspcc-dev/neofs-sdk-go/token" "github.com/valyala/fasthttp" "go.uber.org/zap" ) @@ -97,7 +98,11 @@ func (r request) receiveFile(clnt *pool.Pool, objectAddress *address.Address) { return } - rObj, err := clnt.GetObject(r.RequestCtx, *objectAddress, bearerOpts(r.RequestCtx)) + var prm pool.PrmObjectGet + prm.SetAddress(*objectAddress) + prm.UseBearer(bearerToken(r.RequestCtx)) + + rObj, err := clnt.GetObject(r.RequestCtx, prm) if err != nil { r.handleNeoFSErr(err, start) return @@ -195,11 +200,11 @@ func systemBackwardTranslator(key string) string { return res.String() } -func bearerOpts(ctx context.Context) pool.CallOption { +func bearerToken(ctx context.Context) *token.BearerToken { if tkn, err := tokens.LoadBearerToken(ctx); err == nil { - return pool.WithBearer(tkn) + return tkn } - return pool.WithBearer(nil) + return nil } func (r *request) handleNeoFSErr(err error, start time.Time) { @@ -335,7 +340,12 @@ func (d *Downloader) search(c *fasthttp.RequestCtx, cid *cid.ID, key, val string filters.AddRootFilter() filters.AddFilter(key, val, op) - return d.pool.SearchObjects(c, *cid, filters) + var prm pool.PrmObjectSearch + prm.SetContainerID(*cid) + prm.SetFilters(filters) + prm.UseBearer(bearerToken(c)) + + return d.pool.SearchObjects(c, prm) } // DownloadZipped handles zip by prefix requests. @@ -385,7 +395,7 @@ func (d *Downloader) DownloadZipped(c *fasthttp.RequestCtx) { addr.SetContainerID(containerID) - optBearer := bearerOpts(c) + btoken := bearerToken(c) empty := true called := false @@ -400,7 +410,11 @@ func (d *Downloader) DownloadZipped(c *fasthttp.RequestCtx) { addr.SetObjectID(&id) - resGet, err = d.pool.GetObject(c, addr, optBearer) + var prm pool.PrmObjectGet + prm.SetAddress(addr) + prm.UseBearer(btoken) + + resGet, err = d.pool.GetObject(c, prm) if err != nil { err = fmt.Errorf("get NeoFS object: %v", err) return true diff --git a/downloader/head.go b/downloader/head.go index ddcc770..a299d31 100644 --- a/downloader/head.go +++ b/downloader/head.go @@ -33,8 +33,13 @@ func (r request) headObject(clnt *pool.Pool, objectAddress *address.Address) { return } - bearerOpt := bearerOpts(r.RequestCtx) - obj, err := clnt.HeadObject(r.RequestCtx, *objectAddress, bearerOpt) + btoken := bearerToken(r.RequestCtx) + + var prm pool.PrmObjectHead + prm.SetAddress(*objectAddress) + prm.UseBearer(btoken) + + obj, err := clnt.HeadObject(r.RequestCtx, prm) if err != nil { r.handleNeoFSErr(err, start) return @@ -69,7 +74,12 @@ func (r request) headObject(clnt *pool.Pool, objectAddress *address.Address) { if len(contentType) == 0 { contentType, _, err = readContentType(obj.PayloadSize(), func(sz uint64) (io.Reader, error) { - return clnt.ObjectRange(r.RequestCtx, *objectAddress, 0, sz, bearerOpt) + var prmRange pool.PrmObjectRange + prmRange.SetAddress(*objectAddress) + prmRange.SetLength(sz) + prmRange.UseBearer(btoken) + + return clnt.ObjectRange(r.RequestCtx, prmRange) }) if err != nil && err != io.EOF { r.handleNeoFSErr(err, start) diff --git a/go.mod b/go.mod index 046ec64..bf8c871 100644 --- a/go.mod +++ b/go.mod @@ -10,7 +10,7 @@ require ( github.com/hashicorp/golang-lru v0.5.5-0.20210104140557-80c98217689d // indirect github.com/nspcc-dev/neo-go v0.98.0 github.com/nspcc-dev/neofs-api-go/v2 v2.12.1 - github.com/nspcc-dev/neofs-sdk-go v1.0.0-rc.3 + github.com/nspcc-dev/neofs-sdk-go v1.0.0-rc.3.0.20220407103316-e50e6d28280d github.com/prometheus/client_golang v1.11.0 github.com/prometheus/common v0.29.0 github.com/prometheus/procfs v0.7.1 // indirect diff --git a/go.sum b/go.sum index ab2b770..70dc092 100644 --- a/go.sum +++ b/go.sum @@ -596,8 +596,8 @@ github.com/nspcc-dev/neofs-crypto v0.2.3/go.mod h1:8w16GEJbH6791ktVqHN9YRNH3s9BE github.com/nspcc-dev/neofs-crypto v0.3.0 h1:zlr3pgoxuzrmGCxc5W8dGVfA9Rro8diFvVnBg0L4ifM= github.com/nspcc-dev/neofs-crypto v0.3.0/go.mod h1:8w16GEJbH6791ktVqHN9YRNH3s9BEEKYxGhlFnp0cDw= github.com/nspcc-dev/neofs-sdk-go v0.0.0-20211201182451-a5b61c4f6477/go.mod h1:dfMtQWmBHYpl9Dez23TGtIUKiFvCIxUZq/CkSIhEpz4= -github.com/nspcc-dev/neofs-sdk-go v1.0.0-rc.3 h1:ofaiKPYY67a0cQMF+YSChDO48SBQtWlpZnK++cAeqQM= -github.com/nspcc-dev/neofs-sdk-go v1.0.0-rc.3/go.mod h1:0hTXmyJnbw8j4BR1oltN7mFIIrVp1IFLdh8qBzAR464= +github.com/nspcc-dev/neofs-sdk-go v1.0.0-rc.3.0.20220407103316-e50e6d28280d h1:OHyq8+zyQtARFWj3quRPabcfQWJZEiU7HYp6QGCSjaM= +github.com/nspcc-dev/neofs-sdk-go v1.0.0-rc.3.0.20220407103316-e50e6d28280d/go.mod h1:Hl7a1l0ntZ4b1ZABpGX6fuAuFS3c6+hyMCUNVvZv/w4= github.com/nspcc-dev/rfc6979 v0.1.0/go.mod h1:exhIh1PdpDC5vQmyEsGvc4YDM/lyQp/452QxGq/UEso= github.com/nspcc-dev/rfc6979 v0.2.0 h1:3e1WNxrN60/6N0DW7+UYisLeZJyfqZTNOjeV/toYvOE= github.com/nspcc-dev/rfc6979 v0.2.0/go.mod h1:exhIh1PdpDC5vQmyEsGvc4YDM/lyQp/452QxGq/UEso= diff --git a/integration_test.go b/integration_test.go index 882c138..e1d15ae 100644 --- a/integration_test.go +++ b/integration_test.go @@ -119,7 +119,10 @@ func simplePut(ctx context.Context, t *testing.T, clientPool *pool.Pool, CID *ci payload := bytes.NewBuffer(nil) - res, err := clientPool.GetObject(ctx, *objectAddress) + var prm pool.PrmObjectGet + prm.SetAddress(*objectAddress) + + res, err := clientPool.GetObject(ctx, prm) require.NoError(t, err) _, err = io.Copy(payload, res.Payload) @@ -272,15 +275,15 @@ func getDefaultConfig() *viper.Viper { } func getPool(ctx context.Context, t *testing.T, key *keys.PrivateKey) *pool.Pool { - pb := new(pool.Builder) - pb.AddNode("localhost:8080", 1, 1) + var prm pool.InitParameters + prm.SetKey(&key.PrivateKey) + prm.SetNodeDialTimeout(5 * time.Second) + prm.AddNode(pool.NewNodeParam(1, "localhost:8080", 1)) - opts := &pool.BuilderOptions{ - Key: &key.PrivateKey, - NodeConnectionTimeout: 5 * time.Second, - NodeRequestTimeout: 5 * time.Second, - } - clientPool, err := pb.Build(ctx, opts) + clientPool, err := pool.NewPool(prm) + require.NoError(t, err) + + err = clientPool.Dial(ctx) require.NoError(t, err) return clientPool } @@ -296,17 +299,20 @@ func createContainer(ctx context.Context, t *testing.T, clientPool *pool.Pool) ( container.WithAttribute(container.AttributeTimestamp, strconv.FormatInt(time.Now().Unix(), 10))) cnr.SetOwnerID(clientPool.OwnerID()) - CID, err := clientPool.PutContainer(ctx, cnr) + var waitPrm pool.WaitParams + waitPrm.SetTimeout(15 * time.Second) + waitPrm.SetPollInterval(3 * time.Second) + + var prm pool.PrmContainerPut + prm.SetContainer(*cnr) + prm.SetWaitParams(waitPrm) + + CID, err := clientPool.PutContainer(ctx, prm) if err != nil { return nil, err } fmt.Println(CID.String()) - err = clientPool.WaitForContainerPresence(ctx, CID, &pool.ContainerPollingParams{ - CreationTimeout: 15 * time.Second, - PollInterval: 3 * time.Second, - }) - return CID, err } @@ -324,7 +330,11 @@ func putObject(ctx context.Context, t *testing.T, clientPool *pool.Pool, CID *ci } obj.SetAttributes(attrs...) - id, err := clientPool.PutObject(ctx, *obj, bytes.NewBufferString(content)) + var prm pool.PrmObjectPut + prm.SetHeader(*obj) + prm.SetPayload(bytes.NewBufferString(content)) + + id, err := clientPool.PutObject(ctx, prm) require.NoError(t, err) return id diff --git a/uploader/upload.go b/uploader/upload.go index 4b094a5..7b0798b 100644 --- a/uploader/upload.go +++ b/uploader/upload.go @@ -12,8 +12,6 @@ import ( "github.com/nspcc-dev/neofs-http-gw/response" "github.com/nspcc-dev/neofs-http-gw/tokens" "github.com/nspcc-dev/neofs-http-gw/utils" - "github.com/nspcc-dev/neofs-sdk-go/client" - apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status" cid "github.com/nspcc-dev/neofs-sdk-go/container/id" "github.com/nspcc-dev/neofs-sdk-go/netmap" "github.com/nspcc-dev/neofs-sdk-go/object" @@ -139,7 +137,12 @@ func (u *Uploader) Upload(c *fasthttp.RequestCtx) { ctx, cancel := context.WithCancel(c) defer cancel() - if idObj, err = u.pool.PutObject(ctx, *obj, file, pool.WithBearer(bt)); err != nil { + var prm pool.PrmObjectPut + prm.SetHeader(*obj) + prm.SetPayload(file) + prm.UseBearer(bt) + + if idObj, err = u.pool.PutObject(ctx, prm); err != nil { log.Error("could not store file in neofs", zap.Error(err)) response.Error(c, "could not store file in neofs", fasthttp.StatusBadRequest) return @@ -198,33 +201,29 @@ func (pr *putResponse) encode(w io.Writer) error { } func getEpochDurations(ctx context.Context, p *pool.Pool) (*epochDurations, error) { - if conn, _, err := p.Connection(); err != nil { + networkInfo, err := p.NetworkInfo(ctx) + if err != nil { return nil, err - } else if networkInfoRes, err := conn.NetworkInfo(ctx, client.PrmNetworkInfo{}); err != nil { - return nil, err - } else if err = apistatus.ErrFromStatus(networkInfoRes.Status()); err != nil { - return nil, err - } else { - networkInfo := networkInfoRes.Info() - res := &epochDurations{ - currentEpoch: networkInfo.CurrentEpoch(), - msPerBlock: networkInfo.MsPerBlock(), - } - - networkInfo.NetworkConfig().IterateParameters(func(parameter *netmap.NetworkParameter) bool { - if string(parameter.Key()) == "EpochDuration" { - data := make([]byte, 8) - copy(data, parameter.Value()) - res.blockPerEpoch = binary.LittleEndian.Uint64(data) - return true - } - return false - }) - if res.blockPerEpoch == 0 { - return nil, fmt.Errorf("not found param: EpochDuration") - } - return res, nil } + + res := &epochDurations{ + currentEpoch: networkInfo.CurrentEpoch(), + msPerBlock: networkInfo.MsPerBlock(), + } + + networkInfo.NetworkConfig().IterateParameters(func(parameter *netmap.NetworkParameter) bool { + if string(parameter.Key()) == "EpochDuration" { + data := make([]byte, 8) + copy(data, parameter.Value()) + res.blockPerEpoch = binary.LittleEndian.Uint64(data) + return true + } + return false + }) + if res.blockPerEpoch == 0 { + return nil, fmt.Errorf("not found param: EpochDuration") + } + return res, nil } func needParseExpiration(headers map[string]string) bool {