[#365] Update SDK

Signed-off-by: Denis Kirillov <denis@nspcc.ru>
This commit is contained in:
Denis Kirillov 2022-04-07 17:36:44 +03:00 committed by Alex Vanin
parent 83fb697f80
commit 7710de39ec
9 changed files with 107 additions and 160 deletions

View file

@ -17,9 +17,9 @@ import (
"github.com/nspcc-dev/neofs-s3-gw/api/resolver"
"github.com/nspcc-dev/neofs-s3-gw/internal/neofstest"
cid "github.com/nspcc-dev/neofs-sdk-go/container/id"
"github.com/nspcc-dev/neofs-sdk-go/logger"
"github.com/nspcc-dev/neofs-sdk-go/owner"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
)
type handlerContext struct {
@ -43,8 +43,7 @@ func prepareHandlerContext(t *testing.T) *handlerContext {
key, err := keys.NewPrivateKey()
require.NoError(t, err)
l, err := logger.New(logger.WithTraceLevel("panic"))
require.NoError(t, err)
l := zap.NewNop()
tp := neofstest.NewTestNeoFS()
testResolver := &resolver.BucketResolver{Name: "test_resolver"}

View file

@ -1,9 +1,7 @@
package layer
import (
"bytes"
"context"
"fmt"
"strconv"
"strings"
"time"
@ -180,47 +178,13 @@ func (n *layer) setContainerEACLTable(ctx context.Context, idCnr *cid.ID, table
table.SetSessionToken(boxData.Gate.SessionTokenForSetEACL())
}
if err := n.neoFS.SetContainerEACL(ctx, *table); err != nil {
return err
}
return n.waitEACLPresence(ctx, *idCnr, table, defaultWaitParams())
return n.neoFS.SetContainerEACL(ctx, *table)
}
func (n *layer) GetContainerEACL(ctx context.Context, idCnr *cid.ID) (*eacl.Table, error) {
return n.neoFS.ContainerEACL(ctx, *idCnr)
}
type waitParams struct {
WaitTimeout time.Duration
PollInterval time.Duration
}
func defaultWaitParams() *waitParams {
return &waitParams{
WaitTimeout: 60 * time.Second,
PollInterval: 3 * time.Second,
}
}
func (n *layer) waitEACLPresence(ctx context.Context, idCnr cid.ID, table *eacl.Table, params *waitParams) error {
exp, err := table.Marshal()
if err != nil {
return fmt.Errorf("couldn't marshal eacl: %w", err)
}
return waitFor(ctx, params, func(ctx context.Context) bool {
eaclTable, err := n.neoFS.ContainerEACL(ctx, idCnr)
if err == nil {
got, err := eaclTable.Marshal()
if err == nil && bytes.Equal(exp, got) {
return true
}
}
return false
})
}
func (n *layer) deleteContainer(ctx context.Context, idCnr *cid.ID) error {
var sessionToken *session.Token
boxData, err := GetBoxData(ctx)
@ -228,43 +192,5 @@ func (n *layer) deleteContainer(ctx context.Context, idCnr *cid.ID) error {
sessionToken = boxData.Gate.SessionTokenForDelete()
}
if err = n.neoFS.DeleteContainer(ctx, *idCnr, sessionToken); err != nil {
return err
}
return n.waitForContainerRemoved(ctx, idCnr, defaultWaitParams())
}
func (n *layer) waitForContainerRemoved(ctx context.Context, idCnr *cid.ID, params *waitParams) error {
return waitFor(ctx, params, func(ctx context.Context) bool {
_, err := n.neoFS.Container(ctx, *idCnr)
// TODO: (neofs-s3-gw#367) handle NeoFS API status error
if err != nil && strings.Contains(err.Error(), "container not found") {
return true
}
return false
})
}
// waitFor await that given condition will be met in waitParams time.
func waitFor(ctx context.Context, params *waitParams, condition func(context.Context) bool) error {
wctx, cancel := context.WithTimeout(ctx, params.WaitTimeout)
defer cancel()
ticker := time.NewTimer(params.PollInterval)
defer ticker.Stop()
wdone := wctx.Done()
done := ctx.Done()
for {
select {
case <-done:
return ctx.Err()
case <-wdone:
return wctx.Err()
case <-ticker.C:
if condition(ctx) {
return nil
}
ticker.Reset(params.PollInterval)
}
}
return n.neoFS.DeleteContainer(ctx, *idCnr, sessionToken)
}

View file

@ -23,7 +23,6 @@ import (
"github.com/nspcc-dev/neofs-sdk-go/netmap"
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
"github.com/nspcc-dev/neofs-sdk-go/owner"
"github.com/nspcc-dev/neofs-sdk-go/pool"
"github.com/nspcc-dev/neofs-sdk-go/session"
"go.uber.org/zap"
)
@ -332,15 +331,6 @@ func (n *layer) prepareAuthParameters(ctx context.Context, prm *neofs.PrmAuth) {
prm.PrivateKey = &n.anonKey.Key.PrivateKey
}
// CallOptions returns []pool.CallOption options: client.WithBearer or client.WithKey (if request is anonymous).
func (n *layer) CallOptions(ctx context.Context) []pool.CallOption {
if bd, ok := ctx.Value(api.BoxData).(*accessbox.Box); ok && bd != nil && bd.Gate != nil {
return []pool.CallOption{pool.WithBearer(bd.Gate.BearerToken)}
}
return []pool.CallOption{pool.WithKey(&n.anonKey.Key.PrivateKey)}
}
// GetBucketInfo returns bucket info by name.
func (n *layer) GetBucketInfo(ctx context.Context, name string) (*data.BucketInfo, error) {
name, err := url.QueryUnescape(name)

View file

@ -453,18 +453,20 @@ func obtainSecret() *cli.Command {
func createNeoFS(ctx context.Context, log *zap.Logger, key *ecdsa.PrivateKey, peerAddress string) (authmate.NeoFS, error) {
log.Debug("prepare connection pool")
pb := new(pool.Builder)
pb.AddNode(peerAddress, 1, 1)
var prm pool.InitParameters
prm.SetKey(key)
prm.SetNodeDialTimeout(poolConnectTimeout)
prm.SetHealthcheckTimeout(poolRequestTimeout)
prm.AddNode(pool.NewNodeParam(1, peerAddress, 1))
opts := &pool.BuilderOptions{
Key: key,
NodeConnectionTimeout: poolConnectTimeout,
NodeRequestTimeout: poolRequestTimeout,
}
p, err := pb.Build(ctx, opts)
p, err := pool.NewPool(prm)
if err != nil {
return nil, err
}
if err = p.Dial(ctx); err != nil {
return nil, err
}
return neofs.NewAuthmateNeoFS(p), nil
}

View file

@ -59,7 +59,7 @@ func newApp(ctx context.Context, l *zap.Logger, v *viper.Viper) *App {
obj layer.Client
nc *notifications.Controller
poolPeers = fetchPeers(l, v)
prmPool pool.InitParameters
reBalance = defaultRebalanceTimer
conTimeout = defaultConnectTimeout
@ -104,17 +104,23 @@ func newApp(ctx context.Context, l *zap.Logger, v *viper.Viper) *App {
l.Info("using credentials",
zap.String("NeoFS", hex.EncodeToString(key.PublicKey().Bytes())))
opts := &pool.BuilderOptions{
Key: &key.PrivateKey,
NodeConnectionTimeout: conTimeout,
NodeRequestTimeout: reqTimeout,
ClientRebalanceInterval: reBalance,
prmPool.SetKey(&key.PrivateKey)
prmPool.SetNodeDialTimeout(conTimeout)
prmPool.SetHealthcheckTimeout(reqTimeout)
prmPool.SetClientRebalanceInterval(reBalance)
for _, peer := range fetchPeers(l, v) {
prmPool.AddNode(peer)
}
conns, err := poolPeers.Build(ctx, opts)
conns, err := pool.NewPool(prmPool)
if err != nil {
l.Fatal("failed to create connection pool", zap.Error(err))
}
if err = conns.Dial(ctx); err != nil {
l.Fatal("failed to dial connection pool", zap.Error(err))
}
// prepare random key for anonymous requests
randomKey, err := keys.NewPrivateKey()
if err != nil {

View file

@ -120,9 +120,8 @@ var ignore = map[string]struct{}{
cmdVersion: {},
}
func fetchPeers(l *zap.Logger, v *viper.Viper) *pool.Builder {
pb := new(pool.Builder)
func fetchPeers(l *zap.Logger, v *viper.Viper) []pool.NodeParam {
var nodes []pool.NodeParam
for i := 0; ; i++ {
key := cfgPeers + "." + strconv.Itoa(i) + "."
address := v.GetString(key + "address")
@ -140,14 +139,14 @@ func fetchPeers(l *zap.Logger, v *viper.Viper) *pool.Builder {
priority = 1
}
pb.AddNode(address, priority, weight)
nodes = append(nodes, pool.NewNodeParam(priority, address, weight))
l.Info("added connection peer",
zap.String("address", address),
zap.Float64("weight", weight))
}
return pb
return nodes
}
func fetchDomains(v *viper.Viper) []string {

2
go.mod
View file

@ -13,7 +13,7 @@ require (
github.com/nats-io/nats.go v1.13.1-0.20220121202836-972a071d373d
github.com/nspcc-dev/neo-go v0.98.0
github.com/nspcc-dev/neofs-api-go/v2 v2.12.1
github.com/nspcc-dev/neofs-sdk-go v1.0.0-rc.3
github.com/nspcc-dev/neofs-sdk-go v1.0.0-rc.3.0.20220407103316-e50e6d28280d
github.com/prometheus/client_golang v1.11.0
github.com/spf13/pflag v1.0.5
github.com/spf13/viper v1.7.1

4
go.sum
View file

@ -298,8 +298,8 @@ github.com/nspcc-dev/neofs-crypto v0.2.3/go.mod h1:8w16GEJbH6791ktVqHN9YRNH3s9BE
github.com/nspcc-dev/neofs-crypto v0.3.0 h1:zlr3pgoxuzrmGCxc5W8dGVfA9Rro8diFvVnBg0L4ifM=
github.com/nspcc-dev/neofs-crypto v0.3.0/go.mod h1:8w16GEJbH6791ktVqHN9YRNH3s9BEEKYxGhlFnp0cDw=
github.com/nspcc-dev/neofs-sdk-go v0.0.0-20211201182451-a5b61c4f6477/go.mod h1:dfMtQWmBHYpl9Dez23TGtIUKiFvCIxUZq/CkSIhEpz4=
github.com/nspcc-dev/neofs-sdk-go v1.0.0-rc.3 h1:ofaiKPYY67a0cQMF+YSChDO48SBQtWlpZnK++cAeqQM=
github.com/nspcc-dev/neofs-sdk-go v1.0.0-rc.3/go.mod h1:0hTXmyJnbw8j4BR1oltN7mFIIrVp1IFLdh8qBzAR464=
github.com/nspcc-dev/neofs-sdk-go v1.0.0-rc.3.0.20220407103316-e50e6d28280d h1:OHyq8+zyQtARFWj3quRPabcfQWJZEiU7HYp6QGCSjaM=
github.com/nspcc-dev/neofs-sdk-go v1.0.0-rc.3.0.20220407103316-e50e6d28280d/go.mod h1:Hl7a1l0ntZ4b1ZABpGX6fuAuFS3c6+hyMCUNVvZv/w4=
github.com/nspcc-dev/rfc6979 v0.1.0/go.mod h1:exhIh1PdpDC5vQmyEsGvc4YDM/lyQp/452QxGq/UEso=
github.com/nspcc-dev/rfc6979 v0.2.0 h1:3e1WNxrN60/6N0DW7+UYisLeZJyfqZTNOjeV/toYvOE=
github.com/nspcc-dev/rfc6979 v0.2.0/go.mod h1:exhIh1PdpDC5vQmyEsGvc4YDM/lyQp/452QxGq/UEso=

View file

@ -16,7 +16,6 @@ import (
"github.com/nspcc-dev/neofs-s3-gw/api/layer/neofs"
"github.com/nspcc-dev/neofs-s3-gw/authmate"
"github.com/nspcc-dev/neofs-s3-gw/creds/tokens"
"github.com/nspcc-dev/neofs-sdk-go/client"
"github.com/nspcc-dev/neofs-sdk-go/container"
cid "github.com/nspcc-dev/neofs-sdk-go/container/id"
"github.com/nspcc-dev/neofs-sdk-go/eacl"
@ -50,17 +49,11 @@ func (x *NeoFS) TimeToEpoch(ctx context.Context, futureTime time.Time) (uint64,
futureTime.Format(time.RFC3339), now.Format(time.RFC3339))
}
conn, _, err := x.pool.Connection()
if err != nil {
return 0, 0, fmt.Errorf("get connection from pool: %w", err)
}
res, err := conn.NetworkInfo(ctx, client.PrmNetworkInfo{})
networkInfo, err := x.pool.NetworkInfo(ctx)
if err != nil {
return 0, 0, fmt.Errorf("get network info via client: %w", err)
}
networkInfo := res.Info()
var durEpoch uint64
networkInfo.NetworkConfig().IterateParameters(func(parameter *netmap.NetworkParameter) bool {
@ -101,7 +94,10 @@ func (x *NeoFS) TimeToEpoch(ctx context.Context, futureTime time.Time) (uint64,
// Container implements neofs.NeoFS interface method.
func (x *NeoFS) Container(ctx context.Context, idCnr cid.ID) (*container.Container, error) {
res, err := x.pool.GetContainer(ctx, &idCnr)
var prm pool.PrmContainerGet
prm.SetContainerID(idCnr)
res, err := x.pool.GetContainer(ctx, prm)
if err != nil {
return nil, fmt.Errorf("read container via connection pool: %w", err)
}
@ -134,24 +130,24 @@ func (x *NeoFS) CreateContainer(ctx context.Context, prm neofs.PrmContainerCreat
container.SetNativeName(cnr, prm.Name)
}
var prmPut pool.PrmContainerPut
prmPut.SetContainer(*cnr)
// send request to save the container
idCnr, err := x.pool.PutContainer(ctx, cnr)
idCnr, err := x.pool.PutContainer(ctx, prmPut)
if err != nil {
return nil, fmt.Errorf("save container via connection pool: %w", err)
}
// wait the container to be persisted
err = x.pool.WaitForContainerPresence(ctx, idCnr, pool.DefaultPollingParams())
if err != nil {
return nil, fmt.Errorf("wait for container to be saved: %w", err)
}
return idCnr, nil
}
// UserContainers implements neofs.NeoFS interface method.
func (x *NeoFS) UserContainers(ctx context.Context, id owner.ID) ([]cid.ID, error) {
r, err := x.pool.ListContainers(ctx, &id)
var prm pool.PrmContainerList
prm.SetOwnerID(id)
r, err := x.pool.ListContainers(ctx, prm)
if err != nil {
return nil, fmt.Errorf("list user containers via connection pool: %w", err)
}
@ -161,7 +157,10 @@ func (x *NeoFS) UserContainers(ctx context.Context, id owner.ID) ([]cid.ID, erro
// SetContainerEACL implements neofs.NeoFS interface method.
func (x *NeoFS) SetContainerEACL(ctx context.Context, table eacl.Table) error {
err := x.pool.SetEACL(ctx, &table)
var prm pool.PrmContainerSetEACL
prm.SetTable(table)
err := x.pool.SetEACL(ctx, prm)
if err != nil {
return fmt.Errorf("save eACL via connection pool: %w", err)
}
@ -171,7 +170,10 @@ func (x *NeoFS) SetContainerEACL(ctx context.Context, table eacl.Table) error {
// ContainerEACL implements neofs.NeoFS interface method.
func (x *NeoFS) ContainerEACL(ctx context.Context, id cid.ID) (*eacl.Table, error) {
res, err := x.pool.GetEACL(ctx, &id)
var prm pool.PrmContainerEACL
prm.SetContainerID(id)
res, err := x.pool.GetEACL(ctx, prm)
if err != nil {
return nil, fmt.Errorf("read eACL via connection pool: %w", err)
}
@ -181,7 +183,11 @@ func (x *NeoFS) ContainerEACL(ctx context.Context, id cid.ID) (*eacl.Table, erro
// DeleteContainer implements neofs.NeoFS interface method.
func (x *NeoFS) DeleteContainer(ctx context.Context, id cid.ID, token *session.Token) error {
err := x.pool.DeleteContainer(ctx, &id, pool.WithSession(token))
var prm pool.PrmContainerDelete
prm.SetContainerID(id)
prm.SetSessionToken(*token)
err := x.pool.DeleteContainer(ctx, prm)
if err != nil {
return fmt.Errorf("delete container via connection pool: %w", err)
}
@ -231,15 +237,17 @@ func (x *NeoFS) CreateObject(ctx context.Context, prm neofs.PrmObjectCreate) (*o
objectv2.WriteLock(obj.ToV2(), (objectv2.Lock)(*lock))
}
var callOpt pool.CallOption
var prmPut pool.PrmObjectPut
prmPut.SetHeader(*obj)
prmPut.SetPayload(prm.Payload)
if prm.BearerToken != nil {
callOpt = pool.WithBearer(prm.BearerToken)
prmPut.UseBearer(prm.BearerToken)
} else {
callOpt = pool.WithKey(prm.PrivateKey)
prmPut.UseKey(prm.PrivateKey)
}
idObj, err := x.pool.PutObject(ctx, *obj, prm.Payload, callOpt)
idObj, err := x.pool.PutObject(ctx, prmPut)
if err != nil {
return nil, fmt.Errorf("save object via connection pool: %w", err)
}
@ -260,15 +268,17 @@ func (x *NeoFS) SelectObjects(ctx context.Context, prm neofs.PrmObjectSelect) ([
filters.AddFilter(object.AttributeFileName, prm.FilePrefix, object.MatchCommonPrefix)
}
var callOpt pool.CallOption
var prmSearch pool.PrmObjectSearch
prmSearch.SetContainerID(prm.Container)
prmSearch.SetFilters(filters)
if prm.BearerToken != nil {
callOpt = pool.WithBearer(prm.BearerToken)
prmSearch.UseBearer(prm.BearerToken)
} else {
callOpt = pool.WithKey(prm.PrivateKey)
prmSearch.UseKey(prm.PrivateKey)
}
res, err := x.pool.SearchObjects(ctx, prm.Container, filters, callOpt)
res, err := x.pool.SearchObjects(ctx, prmSearch)
if err != nil {
return nil, fmt.Errorf("init object search via connection pool: %w", err)
}
@ -317,17 +327,18 @@ func (x *NeoFS) ReadObject(ctx context.Context, prm neofs.PrmObjectRead) (*neofs
addr.SetContainerID(&prm.Container)
addr.SetObjectID(&prm.Object)
var callOpt pool.CallOption
var prmGet pool.PrmObjectGet
prmGet.SetAddress(addr)
if prm.BearerToken != nil {
callOpt = pool.WithBearer(prm.BearerToken)
prmGet.UseBearer(prm.BearerToken)
} else {
callOpt = pool.WithKey(prm.PrivateKey)
prmGet.UseKey(prm.PrivateKey)
}
if prm.WithHeader {
if prm.WithPayload {
res, err := x.pool.GetObject(ctx, addr, callOpt)
res, err := x.pool.GetObject(ctx, prmGet)
if err != nil {
// TODO: (neofs-s3-gw#367) use NeoFS SDK API to check the status return
if strings.Contains(err.Error(), "access to operation") && strings.Contains(err.Error(), "is denied by") {
@ -351,7 +362,16 @@ func (x *NeoFS) ReadObject(ctx context.Context, prm neofs.PrmObjectRead) (*neofs
}, nil
}
hdr, err := x.pool.HeadObject(ctx, addr, callOpt)
var prmHead pool.PrmObjectHead
prmHead.SetAddress(addr)
if prm.BearerToken != nil {
prmHead.UseBearer(prm.BearerToken)
} else {
prmHead.UseKey(prm.PrivateKey)
}
hdr, err := x.pool.HeadObject(ctx, prmHead)
if err != nil {
// TODO: (neofs-s3-gw#367) use NeoFS SDK API to check the status return
if strings.Contains(err.Error(), "access to operation") && strings.Contains(err.Error(), "is denied by") {
@ -365,7 +385,7 @@ func (x *NeoFS) ReadObject(ctx context.Context, prm neofs.PrmObjectRead) (*neofs
Head: hdr,
}, nil
} else if prm.PayloadRange[0]+prm.PayloadRange[1] == 0 {
res, err := x.pool.GetObject(ctx, addr, callOpt)
res, err := x.pool.GetObject(ctx, prmGet)
if err != nil {
// TODO: (neofs-s3-gw#367) use NeoFS SDK API to check the status return
if strings.Contains(err.Error(), "access to operation") && strings.Contains(err.Error(), "is denied by") {
@ -380,7 +400,18 @@ func (x *NeoFS) ReadObject(ctx context.Context, prm neofs.PrmObjectRead) (*neofs
}, nil
}
res, err := x.pool.ObjectRange(ctx, addr, prm.PayloadRange[0], prm.PayloadRange[1], callOpt)
var prmRange pool.PrmObjectRange
prmRange.SetAddress(addr)
prmRange.SetOffset(prm.PayloadRange[0])
prmRange.SetLength(prm.PayloadRange[1])
if prm.BearerToken != nil {
prmRange.UseBearer(prm.BearerToken)
} else {
prmRange.UseKey(prm.PrivateKey)
}
res, err := x.pool.ObjectRange(ctx, prmRange)
if err != nil {
// TODO: (neofs-s3-gw#367) use NeoFS SDK API to check the status return
if strings.Contains(err.Error(), "access to operation") && strings.Contains(err.Error(), "is denied by") {
@ -401,15 +432,16 @@ func (x *NeoFS) DeleteObject(ctx context.Context, prm neofs.PrmObjectDelete) err
addr.SetContainerID(&prm.Container)
addr.SetObjectID(&prm.Object)
var callOpt pool.CallOption
var prmDelete pool.PrmObjectDelete
prmDelete.SetAddress(addr)
if prm.BearerToken != nil {
callOpt = pool.WithBearer(prm.BearerToken)
prmDelete.UseBearer(prm.BearerToken)
} else {
callOpt = pool.WithKey(prm.PrivateKey)
prmDelete.UseKey(prm.PrivateKey)
}
err := x.pool.DeleteObject(ctx, addr, callOpt)
err := x.pool.DeleteObject(ctx, prmDelete)
if err != nil {
// TODO: (neofs-s3-gw#367) use NeoFS SDK API to check the status return
if strings.Contains(err.Error(), "access to operation") && strings.Contains(err.Error(), "is denied by") {
@ -435,21 +467,14 @@ func NewResolverNeoFS(p *pool.Pool) *ResolverNeoFS {
// SystemDNS implements resolver.NeoFS interface method.
func (x *ResolverNeoFS) SystemDNS(ctx context.Context) (string, error) {
conn, _, err := x.pool.Connection()
if err != nil {
return "", fmt.Errorf("get connection from the pool: %w", err)
}
var prmCli client.PrmNetworkInfo
res, err := conn.NetworkInfo(ctx, prmCli)
networkInfo, err := x.pool.NetworkInfo(ctx)
if err != nil {
return "", fmt.Errorf("read network info via client: %w", err)
}
var domain string
res.Info().NetworkConfig().IterateParameters(func(parameter *netmap.NetworkParameter) bool {
networkInfo.NetworkConfig().IterateParameters(func(parameter *netmap.NetworkParameter) bool {
if string(parameter.Key()) == "SystemDNS" {
domain = string(parameter.Value())
return true