diff --git a/api/handler/handlers_test.go b/api/handler/handlers_test.go index 2d53ab1..f01dd25 100644 --- a/api/handler/handlers_test.go +++ b/api/handler/handlers_test.go @@ -17,9 +17,9 @@ import ( "github.com/nspcc-dev/neofs-s3-gw/api/resolver" "github.com/nspcc-dev/neofs-s3-gw/internal/neofstest" cid "github.com/nspcc-dev/neofs-sdk-go/container/id" - "github.com/nspcc-dev/neofs-sdk-go/logger" "github.com/nspcc-dev/neofs-sdk-go/owner" "github.com/stretchr/testify/require" + "go.uber.org/zap" ) type handlerContext struct { @@ -43,8 +43,7 @@ func prepareHandlerContext(t *testing.T) *handlerContext { key, err := keys.NewPrivateKey() require.NoError(t, err) - l, err := logger.New(logger.WithTraceLevel("panic")) - require.NoError(t, err) + l := zap.NewNop() tp := neofstest.NewTestNeoFS() testResolver := &resolver.BucketResolver{Name: "test_resolver"} diff --git a/api/layer/container.go b/api/layer/container.go index d82d6f6..dce1a40 100644 --- a/api/layer/container.go +++ b/api/layer/container.go @@ -1,9 +1,7 @@ package layer import ( - "bytes" "context" - "fmt" "strconv" "strings" "time" @@ -180,47 +178,13 @@ func (n *layer) setContainerEACLTable(ctx context.Context, idCnr *cid.ID, table table.SetSessionToken(boxData.Gate.SessionTokenForSetEACL()) } - if err := n.neoFS.SetContainerEACL(ctx, *table); err != nil { - return err - } - - return n.waitEACLPresence(ctx, *idCnr, table, defaultWaitParams()) + return n.neoFS.SetContainerEACL(ctx, *table) } func (n *layer) GetContainerEACL(ctx context.Context, idCnr *cid.ID) (*eacl.Table, error) { return n.neoFS.ContainerEACL(ctx, *idCnr) } -type waitParams struct { - WaitTimeout time.Duration - PollInterval time.Duration -} - -func defaultWaitParams() *waitParams { - return &waitParams{ - WaitTimeout: 60 * time.Second, - PollInterval: 3 * time.Second, - } -} - -func (n *layer) waitEACLPresence(ctx context.Context, idCnr cid.ID, table *eacl.Table, params *waitParams) error { - exp, err := table.Marshal() - if err != nil { - return fmt.Errorf("couldn't marshal eacl: %w", err) - } - - return waitFor(ctx, params, func(ctx context.Context) bool { - eaclTable, err := n.neoFS.ContainerEACL(ctx, idCnr) - if err == nil { - got, err := eaclTable.Marshal() - if err == nil && bytes.Equal(exp, got) { - return true - } - } - return false - }) -} - func (n *layer) deleteContainer(ctx context.Context, idCnr *cid.ID) error { var sessionToken *session.Token boxData, err := GetBoxData(ctx) @@ -228,43 +192,5 @@ func (n *layer) deleteContainer(ctx context.Context, idCnr *cid.ID) error { sessionToken = boxData.Gate.SessionTokenForDelete() } - if err = n.neoFS.DeleteContainer(ctx, *idCnr, sessionToken); err != nil { - return err - } - - return n.waitForContainerRemoved(ctx, idCnr, defaultWaitParams()) -} - -func (n *layer) waitForContainerRemoved(ctx context.Context, idCnr *cid.ID, params *waitParams) error { - return waitFor(ctx, params, func(ctx context.Context) bool { - _, err := n.neoFS.Container(ctx, *idCnr) - // TODO: (neofs-s3-gw#367) handle NeoFS API status error - if err != nil && strings.Contains(err.Error(), "container not found") { - return true - } - return false - }) -} - -// waitFor await that given condition will be met in waitParams time. -func waitFor(ctx context.Context, params *waitParams, condition func(context.Context) bool) error { - wctx, cancel := context.WithTimeout(ctx, params.WaitTimeout) - defer cancel() - ticker := time.NewTimer(params.PollInterval) - defer ticker.Stop() - wdone := wctx.Done() - done := ctx.Done() - for { - select { - case <-done: - return ctx.Err() - case <-wdone: - return wctx.Err() - case <-ticker.C: - if condition(ctx) { - return nil - } - ticker.Reset(params.PollInterval) - } - } + return n.neoFS.DeleteContainer(ctx, *idCnr, sessionToken) } diff --git a/api/layer/layer.go b/api/layer/layer.go index 0a1be3a..df69b21 100644 --- a/api/layer/layer.go +++ b/api/layer/layer.go @@ -23,7 +23,6 @@ import ( "github.com/nspcc-dev/neofs-sdk-go/netmap" oid "github.com/nspcc-dev/neofs-sdk-go/object/id" "github.com/nspcc-dev/neofs-sdk-go/owner" - "github.com/nspcc-dev/neofs-sdk-go/pool" "github.com/nspcc-dev/neofs-sdk-go/session" "go.uber.org/zap" ) @@ -332,15 +331,6 @@ func (n *layer) prepareAuthParameters(ctx context.Context, prm *neofs.PrmAuth) { prm.PrivateKey = &n.anonKey.Key.PrivateKey } -// CallOptions returns []pool.CallOption options: client.WithBearer or client.WithKey (if request is anonymous). -func (n *layer) CallOptions(ctx context.Context) []pool.CallOption { - if bd, ok := ctx.Value(api.BoxData).(*accessbox.Box); ok && bd != nil && bd.Gate != nil { - return []pool.CallOption{pool.WithBearer(bd.Gate.BearerToken)} - } - - return []pool.CallOption{pool.WithKey(&n.anonKey.Key.PrivateKey)} -} - // GetBucketInfo returns bucket info by name. func (n *layer) GetBucketInfo(ctx context.Context, name string) (*data.BucketInfo, error) { name, err := url.QueryUnescape(name) diff --git a/cmd/authmate/main.go b/cmd/authmate/main.go index 9bfab04..04f960d 100644 --- a/cmd/authmate/main.go +++ b/cmd/authmate/main.go @@ -453,18 +453,20 @@ func obtainSecret() *cli.Command { func createNeoFS(ctx context.Context, log *zap.Logger, key *ecdsa.PrivateKey, peerAddress string) (authmate.NeoFS, error) { log.Debug("prepare connection pool") - pb := new(pool.Builder) - pb.AddNode(peerAddress, 1, 1) + var prm pool.InitParameters + prm.SetKey(key) + prm.SetNodeDialTimeout(poolConnectTimeout) + prm.SetHealthcheckTimeout(poolRequestTimeout) + prm.AddNode(pool.NewNodeParam(1, peerAddress, 1)) - opts := &pool.BuilderOptions{ - Key: key, - NodeConnectionTimeout: poolConnectTimeout, - NodeRequestTimeout: poolRequestTimeout, - } - p, err := pb.Build(ctx, opts) + p, err := pool.NewPool(prm) if err != nil { return nil, err } + if err = p.Dial(ctx); err != nil { + return nil, err + } + return neofs.NewAuthmateNeoFS(p), nil } diff --git a/cmd/s3-gw/app.go b/cmd/s3-gw/app.go index bbd2eca..a2b1f13 100644 --- a/cmd/s3-gw/app.go +++ b/cmd/s3-gw/app.go @@ -59,7 +59,7 @@ func newApp(ctx context.Context, l *zap.Logger, v *viper.Viper) *App { obj layer.Client nc *notifications.Controller - poolPeers = fetchPeers(l, v) + prmPool pool.InitParameters reBalance = defaultRebalanceTimer conTimeout = defaultConnectTimeout @@ -104,17 +104,23 @@ func newApp(ctx context.Context, l *zap.Logger, v *viper.Viper) *App { l.Info("using credentials", zap.String("NeoFS", hex.EncodeToString(key.PublicKey().Bytes()))) - opts := &pool.BuilderOptions{ - Key: &key.PrivateKey, - NodeConnectionTimeout: conTimeout, - NodeRequestTimeout: reqTimeout, - ClientRebalanceInterval: reBalance, + prmPool.SetKey(&key.PrivateKey) + prmPool.SetNodeDialTimeout(conTimeout) + prmPool.SetHealthcheckTimeout(reqTimeout) + prmPool.SetClientRebalanceInterval(reBalance) + for _, peer := range fetchPeers(l, v) { + prmPool.AddNode(peer) } - conns, err := poolPeers.Build(ctx, opts) + + conns, err := pool.NewPool(prmPool) if err != nil { l.Fatal("failed to create connection pool", zap.Error(err)) } + if err = conns.Dial(ctx); err != nil { + l.Fatal("failed to dial connection pool", zap.Error(err)) + } + // prepare random key for anonymous requests randomKey, err := keys.NewPrivateKey() if err != nil { diff --git a/cmd/s3-gw/app_settings.go b/cmd/s3-gw/app_settings.go index 2262da2..183108d 100644 --- a/cmd/s3-gw/app_settings.go +++ b/cmd/s3-gw/app_settings.go @@ -120,9 +120,8 @@ var ignore = map[string]struct{}{ cmdVersion: {}, } -func fetchPeers(l *zap.Logger, v *viper.Viper) *pool.Builder { - pb := new(pool.Builder) - +func fetchPeers(l *zap.Logger, v *viper.Viper) []pool.NodeParam { + var nodes []pool.NodeParam for i := 0; ; i++ { key := cfgPeers + "." + strconv.Itoa(i) + "." address := v.GetString(key + "address") @@ -140,14 +139,14 @@ func fetchPeers(l *zap.Logger, v *viper.Viper) *pool.Builder { priority = 1 } - pb.AddNode(address, priority, weight) + nodes = append(nodes, pool.NewNodeParam(priority, address, weight)) l.Info("added connection peer", zap.String("address", address), zap.Float64("weight", weight)) } - return pb + return nodes } func fetchDomains(v *viper.Viper) []string { diff --git a/go.mod b/go.mod index fc72e1a..1580933 100644 --- a/go.mod +++ b/go.mod @@ -13,7 +13,7 @@ require ( github.com/nats-io/nats.go v1.13.1-0.20220121202836-972a071d373d github.com/nspcc-dev/neo-go v0.98.0 github.com/nspcc-dev/neofs-api-go/v2 v2.12.1 - github.com/nspcc-dev/neofs-sdk-go v1.0.0-rc.3 + github.com/nspcc-dev/neofs-sdk-go v1.0.0-rc.3.0.20220407103316-e50e6d28280d github.com/prometheus/client_golang v1.11.0 github.com/spf13/pflag v1.0.5 github.com/spf13/viper v1.7.1 diff --git a/go.sum b/go.sum index ba10427..9e20a5b 100644 --- a/go.sum +++ b/go.sum @@ -298,8 +298,8 @@ github.com/nspcc-dev/neofs-crypto v0.2.3/go.mod h1:8w16GEJbH6791ktVqHN9YRNH3s9BE github.com/nspcc-dev/neofs-crypto v0.3.0 h1:zlr3pgoxuzrmGCxc5W8dGVfA9Rro8diFvVnBg0L4ifM= github.com/nspcc-dev/neofs-crypto v0.3.0/go.mod h1:8w16GEJbH6791ktVqHN9YRNH3s9BEEKYxGhlFnp0cDw= github.com/nspcc-dev/neofs-sdk-go v0.0.0-20211201182451-a5b61c4f6477/go.mod h1:dfMtQWmBHYpl9Dez23TGtIUKiFvCIxUZq/CkSIhEpz4= -github.com/nspcc-dev/neofs-sdk-go v1.0.0-rc.3 h1:ofaiKPYY67a0cQMF+YSChDO48SBQtWlpZnK++cAeqQM= -github.com/nspcc-dev/neofs-sdk-go v1.0.0-rc.3/go.mod h1:0hTXmyJnbw8j4BR1oltN7mFIIrVp1IFLdh8qBzAR464= +github.com/nspcc-dev/neofs-sdk-go v1.0.0-rc.3.0.20220407103316-e50e6d28280d h1:OHyq8+zyQtARFWj3quRPabcfQWJZEiU7HYp6QGCSjaM= +github.com/nspcc-dev/neofs-sdk-go v1.0.0-rc.3.0.20220407103316-e50e6d28280d/go.mod h1:Hl7a1l0ntZ4b1ZABpGX6fuAuFS3c6+hyMCUNVvZv/w4= github.com/nspcc-dev/rfc6979 v0.1.0/go.mod h1:exhIh1PdpDC5vQmyEsGvc4YDM/lyQp/452QxGq/UEso= github.com/nspcc-dev/rfc6979 v0.2.0 h1:3e1WNxrN60/6N0DW7+UYisLeZJyfqZTNOjeV/toYvOE= github.com/nspcc-dev/rfc6979 v0.2.0/go.mod h1:exhIh1PdpDC5vQmyEsGvc4YDM/lyQp/452QxGq/UEso= diff --git a/internal/neofs/neofs.go b/internal/neofs/neofs.go index 49ac07f..f3b483b 100644 --- a/internal/neofs/neofs.go +++ b/internal/neofs/neofs.go @@ -16,7 +16,6 @@ import ( "github.com/nspcc-dev/neofs-s3-gw/api/layer/neofs" "github.com/nspcc-dev/neofs-s3-gw/authmate" "github.com/nspcc-dev/neofs-s3-gw/creds/tokens" - "github.com/nspcc-dev/neofs-sdk-go/client" "github.com/nspcc-dev/neofs-sdk-go/container" cid "github.com/nspcc-dev/neofs-sdk-go/container/id" "github.com/nspcc-dev/neofs-sdk-go/eacl" @@ -50,17 +49,11 @@ func (x *NeoFS) TimeToEpoch(ctx context.Context, futureTime time.Time) (uint64, futureTime.Format(time.RFC3339), now.Format(time.RFC3339)) } - conn, _, err := x.pool.Connection() - if err != nil { - return 0, 0, fmt.Errorf("get connection from pool: %w", err) - } - - res, err := conn.NetworkInfo(ctx, client.PrmNetworkInfo{}) + networkInfo, err := x.pool.NetworkInfo(ctx) if err != nil { return 0, 0, fmt.Errorf("get network info via client: %w", err) } - networkInfo := res.Info() var durEpoch uint64 networkInfo.NetworkConfig().IterateParameters(func(parameter *netmap.NetworkParameter) bool { @@ -101,7 +94,10 @@ func (x *NeoFS) TimeToEpoch(ctx context.Context, futureTime time.Time) (uint64, // Container implements neofs.NeoFS interface method. func (x *NeoFS) Container(ctx context.Context, idCnr cid.ID) (*container.Container, error) { - res, err := x.pool.GetContainer(ctx, &idCnr) + var prm pool.PrmContainerGet + prm.SetContainerID(idCnr) + + res, err := x.pool.GetContainer(ctx, prm) if err != nil { return nil, fmt.Errorf("read container via connection pool: %w", err) } @@ -134,24 +130,24 @@ func (x *NeoFS) CreateContainer(ctx context.Context, prm neofs.PrmContainerCreat container.SetNativeName(cnr, prm.Name) } + var prmPut pool.PrmContainerPut + prmPut.SetContainer(*cnr) + // send request to save the container - idCnr, err := x.pool.PutContainer(ctx, cnr) + idCnr, err := x.pool.PutContainer(ctx, prmPut) if err != nil { return nil, fmt.Errorf("save container via connection pool: %w", err) } - // wait the container to be persisted - err = x.pool.WaitForContainerPresence(ctx, idCnr, pool.DefaultPollingParams()) - if err != nil { - return nil, fmt.Errorf("wait for container to be saved: %w", err) - } - return idCnr, nil } // UserContainers implements neofs.NeoFS interface method. func (x *NeoFS) UserContainers(ctx context.Context, id owner.ID) ([]cid.ID, error) { - r, err := x.pool.ListContainers(ctx, &id) + var prm pool.PrmContainerList + prm.SetOwnerID(id) + + r, err := x.pool.ListContainers(ctx, prm) if err != nil { return nil, fmt.Errorf("list user containers via connection pool: %w", err) } @@ -161,7 +157,10 @@ func (x *NeoFS) UserContainers(ctx context.Context, id owner.ID) ([]cid.ID, erro // SetContainerEACL implements neofs.NeoFS interface method. func (x *NeoFS) SetContainerEACL(ctx context.Context, table eacl.Table) error { - err := x.pool.SetEACL(ctx, &table) + var prm pool.PrmContainerSetEACL + prm.SetTable(table) + + err := x.pool.SetEACL(ctx, prm) if err != nil { return fmt.Errorf("save eACL via connection pool: %w", err) } @@ -171,7 +170,10 @@ func (x *NeoFS) SetContainerEACL(ctx context.Context, table eacl.Table) error { // ContainerEACL implements neofs.NeoFS interface method. func (x *NeoFS) ContainerEACL(ctx context.Context, id cid.ID) (*eacl.Table, error) { - res, err := x.pool.GetEACL(ctx, &id) + var prm pool.PrmContainerEACL + prm.SetContainerID(id) + + res, err := x.pool.GetEACL(ctx, prm) if err != nil { return nil, fmt.Errorf("read eACL via connection pool: %w", err) } @@ -181,7 +183,11 @@ func (x *NeoFS) ContainerEACL(ctx context.Context, id cid.ID) (*eacl.Table, erro // DeleteContainer implements neofs.NeoFS interface method. func (x *NeoFS) DeleteContainer(ctx context.Context, id cid.ID, token *session.Token) error { - err := x.pool.DeleteContainer(ctx, &id, pool.WithSession(token)) + var prm pool.PrmContainerDelete + prm.SetContainerID(id) + prm.SetSessionToken(*token) + + err := x.pool.DeleteContainer(ctx, prm) if err != nil { return fmt.Errorf("delete container via connection pool: %w", err) } @@ -231,15 +237,17 @@ func (x *NeoFS) CreateObject(ctx context.Context, prm neofs.PrmObjectCreate) (*o objectv2.WriteLock(obj.ToV2(), (objectv2.Lock)(*lock)) } - var callOpt pool.CallOption + var prmPut pool.PrmObjectPut + prmPut.SetHeader(*obj) + prmPut.SetPayload(prm.Payload) if prm.BearerToken != nil { - callOpt = pool.WithBearer(prm.BearerToken) + prmPut.UseBearer(prm.BearerToken) } else { - callOpt = pool.WithKey(prm.PrivateKey) + prmPut.UseKey(prm.PrivateKey) } - idObj, err := x.pool.PutObject(ctx, *obj, prm.Payload, callOpt) + idObj, err := x.pool.PutObject(ctx, prmPut) if err != nil { return nil, fmt.Errorf("save object via connection pool: %w", err) } @@ -260,15 +268,17 @@ func (x *NeoFS) SelectObjects(ctx context.Context, prm neofs.PrmObjectSelect) ([ filters.AddFilter(object.AttributeFileName, prm.FilePrefix, object.MatchCommonPrefix) } - var callOpt pool.CallOption + var prmSearch pool.PrmObjectSearch + prmSearch.SetContainerID(prm.Container) + prmSearch.SetFilters(filters) if prm.BearerToken != nil { - callOpt = pool.WithBearer(prm.BearerToken) + prmSearch.UseBearer(prm.BearerToken) } else { - callOpt = pool.WithKey(prm.PrivateKey) + prmSearch.UseKey(prm.PrivateKey) } - res, err := x.pool.SearchObjects(ctx, prm.Container, filters, callOpt) + res, err := x.pool.SearchObjects(ctx, prmSearch) if err != nil { return nil, fmt.Errorf("init object search via connection pool: %w", err) } @@ -317,17 +327,18 @@ func (x *NeoFS) ReadObject(ctx context.Context, prm neofs.PrmObjectRead) (*neofs addr.SetContainerID(&prm.Container) addr.SetObjectID(&prm.Object) - var callOpt pool.CallOption + var prmGet pool.PrmObjectGet + prmGet.SetAddress(addr) if prm.BearerToken != nil { - callOpt = pool.WithBearer(prm.BearerToken) + prmGet.UseBearer(prm.BearerToken) } else { - callOpt = pool.WithKey(prm.PrivateKey) + prmGet.UseKey(prm.PrivateKey) } if prm.WithHeader { if prm.WithPayload { - res, err := x.pool.GetObject(ctx, addr, callOpt) + res, err := x.pool.GetObject(ctx, prmGet) if err != nil { // TODO: (neofs-s3-gw#367) use NeoFS SDK API to check the status return if strings.Contains(err.Error(), "access to operation") && strings.Contains(err.Error(), "is denied by") { @@ -351,7 +362,16 @@ func (x *NeoFS) ReadObject(ctx context.Context, prm neofs.PrmObjectRead) (*neofs }, nil } - hdr, err := x.pool.HeadObject(ctx, addr, callOpt) + var prmHead pool.PrmObjectHead + prmHead.SetAddress(addr) + + if prm.BearerToken != nil { + prmHead.UseBearer(prm.BearerToken) + } else { + prmHead.UseKey(prm.PrivateKey) + } + + hdr, err := x.pool.HeadObject(ctx, prmHead) if err != nil { // TODO: (neofs-s3-gw#367) use NeoFS SDK API to check the status return if strings.Contains(err.Error(), "access to operation") && strings.Contains(err.Error(), "is denied by") { @@ -365,7 +385,7 @@ func (x *NeoFS) ReadObject(ctx context.Context, prm neofs.PrmObjectRead) (*neofs Head: hdr, }, nil } else if prm.PayloadRange[0]+prm.PayloadRange[1] == 0 { - res, err := x.pool.GetObject(ctx, addr, callOpt) + res, err := x.pool.GetObject(ctx, prmGet) if err != nil { // TODO: (neofs-s3-gw#367) use NeoFS SDK API to check the status return if strings.Contains(err.Error(), "access to operation") && strings.Contains(err.Error(), "is denied by") { @@ -380,7 +400,18 @@ func (x *NeoFS) ReadObject(ctx context.Context, prm neofs.PrmObjectRead) (*neofs }, nil } - res, err := x.pool.ObjectRange(ctx, addr, prm.PayloadRange[0], prm.PayloadRange[1], callOpt) + var prmRange pool.PrmObjectRange + prmRange.SetAddress(addr) + prmRange.SetOffset(prm.PayloadRange[0]) + prmRange.SetLength(prm.PayloadRange[1]) + + if prm.BearerToken != nil { + prmRange.UseBearer(prm.BearerToken) + } else { + prmRange.UseKey(prm.PrivateKey) + } + + res, err := x.pool.ObjectRange(ctx, prmRange) if err != nil { // TODO: (neofs-s3-gw#367) use NeoFS SDK API to check the status return if strings.Contains(err.Error(), "access to operation") && strings.Contains(err.Error(), "is denied by") { @@ -401,15 +432,16 @@ func (x *NeoFS) DeleteObject(ctx context.Context, prm neofs.PrmObjectDelete) err addr.SetContainerID(&prm.Container) addr.SetObjectID(&prm.Object) - var callOpt pool.CallOption + var prmDelete pool.PrmObjectDelete + prmDelete.SetAddress(addr) if prm.BearerToken != nil { - callOpt = pool.WithBearer(prm.BearerToken) + prmDelete.UseBearer(prm.BearerToken) } else { - callOpt = pool.WithKey(prm.PrivateKey) + prmDelete.UseKey(prm.PrivateKey) } - err := x.pool.DeleteObject(ctx, addr, callOpt) + err := x.pool.DeleteObject(ctx, prmDelete) if err != nil { // TODO: (neofs-s3-gw#367) use NeoFS SDK API to check the status return if strings.Contains(err.Error(), "access to operation") && strings.Contains(err.Error(), "is denied by") { @@ -435,21 +467,14 @@ func NewResolverNeoFS(p *pool.Pool) *ResolverNeoFS { // SystemDNS implements resolver.NeoFS interface method. func (x *ResolverNeoFS) SystemDNS(ctx context.Context) (string, error) { - conn, _, err := x.pool.Connection() - if err != nil { - return "", fmt.Errorf("get connection from the pool: %w", err) - } - - var prmCli client.PrmNetworkInfo - - res, err := conn.NetworkInfo(ctx, prmCli) + networkInfo, err := x.pool.NetworkInfo(ctx) if err != nil { return "", fmt.Errorf("read network info via client: %w", err) } var domain string - res.Info().NetworkConfig().IterateParameters(func(parameter *netmap.NetworkParameter) bool { + networkInfo.NetworkConfig().IterateParameters(func(parameter *netmap.NetworkParameter) bool { if string(parameter.Key()) == "SystemDNS" { domain = string(parameter.Value()) return true