[#166] v2/netmap: add v2 structures for netmap service

Signed-off-by: Alex Vanin <alexey@nspcc.ru>
This commit is contained in:
Alex Vanin 2020-10-08 14:22:28 +03:00 committed by Alex Vanin
parent fa18f5ede7
commit a29b615522
8 changed files with 555 additions and 1 deletions

159
v2/netmap/client.go Normal file
View file

@ -0,0 +1,159 @@
package netmap
import (
"context"
"github.com/nspcc-dev/neofs-api-go/v2/client"
netmap "github.com/nspcc-dev/neofs-api-go/v2/netmap/grpc"
"github.com/pkg/errors"
"google.golang.org/grpc"
)
// Client represents universal netmap transport client.
type Client struct {
cLocalNodeInfo *localNodeInfoClient
}
// Option represents Client option.
type Option func(*cfg)
type cfg struct {
proto client.Protocol
globalOpts []client.Option
gRPC cfgGRPC
}
type cfgGRPC struct {
serviceClient netmap.NetmapServiceClient
grpcCallOpts []grpc.CallOption
callOpts []netmap.Option
client *netmap.Client
}
type localNodeInfoClient struct {
requestConverter func(*LocalNodeInfoRequest) interface{}
caller func(context.Context, interface{}) (interface{}, error)
responseConverter func(interface{}) *LocalNodeInfoResponse
}
// LocalNodeInfo sends LocalNodeInfoRequest over the network.
func (c *Client) LocalNodeInfo(ctx context.Context, req *LocalNodeInfoRequest) (*LocalNodeInfoResponse, error) {
resp, err := c.cLocalNodeInfo.caller(ctx, c.cLocalNodeInfo.requestConverter(req))
if err != nil {
return nil, errors.Wrap(err, "could not send container put request")
}
return c.cLocalNodeInfo.responseConverter(resp), nil
}
func defaultCfg() *cfg {
return &cfg{
proto: client.ProtoGRPC,
}
}
// NewClient is a constructor for netmap transport client.
func NewClient(opts ...Option) (*Client, error) {
cfg := defaultCfg()
for i := range opts {
opts[i](cfg)
}
var err error
switch cfg.proto {
case client.ProtoGRPC:
var c *netmap.Client
if c, err = newGRPCClient(cfg); err != nil {
break
}
return &Client{
cLocalNodeInfo: &localNodeInfoClient{
requestConverter: func(req *LocalNodeInfoRequest) interface{} {
return LocalNodeInfoRequestToGRPCMessage(req)
},
caller: func(ctx context.Context, req interface{}) (interface{}, error) {
return c.LocalNodeInfo(ctx, req.(*netmap.LocalNodeInfoRequest))
},
responseConverter: func(resp interface{}) *LocalNodeInfoResponse {
return LocalNodeInfoResponseFromGRPCMessage(resp.(*netmap.LocalNodeInfoResponse))
},
},
}, nil
default:
err = client.ErrProtoUnsupported
}
return nil, errors.Wrapf(err, "could not create %s Session client", cfg.proto)
}
func newGRPCClient(cfg *cfg) (*netmap.Client, error) {
var err error
if cfg.gRPC.client == nil {
if cfg.gRPC.serviceClient == nil {
conn, err := client.NewGRPCClientConn(cfg.globalOpts...)
if err != nil {
return nil, errors.Wrap(err, "could not open gRPC client connection")
}
cfg.gRPC.serviceClient = netmap.NewNetmapServiceClient(conn)
}
cfg.gRPC.client, err = netmap.NewClient(
cfg.gRPC.serviceClient,
append(
cfg.gRPC.callOpts,
netmap.WithCallOptions(cfg.gRPC.grpcCallOpts),
)...,
)
}
return cfg.gRPC.client, err
}
// WithGlobalOpts sets global client options to client.
func WithGlobalOpts(v ...client.Option) Option {
return func(c *cfg) {
if len(v) > 0 {
c.globalOpts = v
}
}
}
// WithGRPCServiceClient sets existing service client.
func WithGRPCServiceClient(v netmap.NetmapServiceClient) Option {
return func(c *cfg) {
c.gRPC.serviceClient = v
}
}
// WithGRPCServiceClient sets GRPC specific call options.
func WithGRPCCallOpts(v []grpc.CallOption) Option {
return func(c *cfg) {
c.gRPC.grpcCallOpts = v
}
}
// WithGRPCServiceClient sets GRPC specific client options.
func WithGRPCClientOpts(v []netmap.Option) Option {
return func(c *cfg) {
c.gRPC.callOpts = v
}
}
// WithGRPCServiceClient sets existing GRPC client.
func WithGRPCClient(v *netmap.Client) Option {
return func(c *cfg) {
c.gRPC.client = v
}
}

View file

@ -2,6 +2,8 @@ package netmap
import (
netmap "github.com/nspcc-dev/neofs-api-go/v2/netmap/grpc"
"github.com/nspcc-dev/neofs-api-go/v2/refs"
"github.com/nspcc-dev/neofs-api-go/v2/session"
)
func FilterToGRPCMessage(f *Filter) *netmap.Filter {
@ -259,3 +261,96 @@ func NodeInfoFromGRPCMessage(m *netmap.NodeInfo) *NodeInfo {
return a
}
func LocalNodeInfoRequestBodyToGRPCMessage(r *LocalNodeInfoRequestBody) *netmap.LocalNodeInfoRequest_Body {
if r == nil {
return nil
}
return new(netmap.LocalNodeInfoRequest_Body)
}
func LocalNodeInfoRequestBodyFromGRPCMessage(m *netmap.LocalNodeInfoRequest_Body) *LocalNodeInfoRequestBody {
if m == nil {
return nil
}
return new(LocalNodeInfoRequestBody)
}
func LocalNodeInfoResponseBodyToGRPCMessage(r *LocalNodeInfoResponseBody) *netmap.LocalNodeInfoResponse_Body {
if r == nil {
return nil
}
m := new(netmap.LocalNodeInfoResponse_Body)
m.SetVersion(refs.VersionToGRPCMessage(r.GetVersion()))
m.SetNodeInfo(NodeInfoToGRPCMessage(r.GetNodeInfo()))
return m
}
func LocalNodeInfoResponseBodyFromGRPCMessage(m *netmap.LocalNodeInfoResponse_Body) *LocalNodeInfoResponseBody {
if m == nil {
return nil
}
r := new(LocalNodeInfoResponseBody)
r.SetVersion(refs.VersionFromGRPCMessage(m.GetVersion()))
r.SetNodeInfo(NodeInfoFromGRPCMessage(m.GetNodeInfo()))
return r
}
func LocalNodeInfoRequestToGRPCMessage(r *LocalNodeInfoRequest) *netmap.LocalNodeInfoRequest {
if r == nil {
return nil
}
m := new(netmap.LocalNodeInfoRequest)
m.SetBody(LocalNodeInfoRequestBodyToGRPCMessage(r.GetBody()))
session.RequestHeadersToGRPC(r, m)
return m
}
func LocalNodeInfoRequestFromGRPCMessage(m *netmap.LocalNodeInfoRequest) *LocalNodeInfoRequest {
if m == nil {
return nil
}
r := new(LocalNodeInfoRequest)
r.SetBody(LocalNodeInfoRequestBodyFromGRPCMessage(m.GetBody()))
session.RequestHeadersFromGRPC(m, r)
return r
}
func LocalNodeInfoResponseToGRPCMessage(r *LocalNodeInfoResponse) *netmap.LocalNodeInfoResponse {
if r == nil {
return nil
}
m := new(netmap.LocalNodeInfoResponse)
m.SetBody(LocalNodeInfoResponseBodyToGRPCMessage(r.GetBody()))
session.ResponseHeadersToGRPC(r, m)
return m
}
func LocalNodeInfoResponseFromGRPCMessage(m *netmap.LocalNodeInfoResponse) *LocalNodeInfoResponse {
if m == nil {
return nil
}
r := new(LocalNodeInfoResponse)
r.SetBody(LocalNodeInfoResponseBodyFromGRPCMessage(m.GetBody()))
session.ResponseHeadersFromGRPC(m, r)
return r
}

62
v2/netmap/grpc/client.go Normal file
View file

@ -0,0 +1,62 @@
package netmap
import (
"context"
"github.com/pkg/errors"
"google.golang.org/grpc"
)
// Client wraps NetmapServiceClient
// with pre-defined configurations.
type Client struct {
*cfg
client NetmapServiceClient
}
// Option represents Client option.
type Option func(*cfg)
type cfg struct {
callOpts []grpc.CallOption
}
// ErrNilNetmapServiceClient is returned by functions that expect
// a non-nil ContainerServiceClient, but received nil.
var ErrNilNetmapServiceClient = errors.New("netmap gRPC client is nil")
func defaultCfg() *cfg {
return new(cfg)
}
// NewClient creates, initializes and returns a new Client instance.
//
// Options are applied one by one in order.
func NewClient(c NetmapServiceClient, opts ...Option) (*Client, error) {
if c == nil {
return nil, ErrNilNetmapServiceClient
}
cfg := defaultCfg()
for i := range opts {
opts[i](cfg)
}
return &Client{
cfg: cfg,
client: c,
}, nil
}
func (c *Client) LocalNodeInfo(ctx context.Context, req *LocalNodeInfoRequest) (*LocalNodeInfoResponse, error) {
return c.client.LocalNodeInfo(ctx, req, c.callOpts...)
}
// WithCallOptions returns Option that configures
// Client to attach call options to each rpc call.
func WithCallOptions(opts []grpc.CallOption) Option {
return func(c *cfg) {
c.callOpts = opts
}
}

View file

@ -33,6 +33,9 @@ const (
addressNodeInfoField = 2
attributesNodeInfoField = 3
stateNodeInfoField = 4
versionInfoResponseBodyField = 1
nodeInfoResponseBodyField = 2
)
func (f *Filter) StableMarshal(buf []byte) ([]byte, error) {
@ -381,3 +384,51 @@ func (ni *NodeInfo) StableSize() (size int) {
return size
}
func (l *LocalNodeInfoRequestBody) StableMarshal(buf []byte) ([]byte, error) {
return nil, nil
}
func (l *LocalNodeInfoRequestBody) StableSize() (size int) {
return 0
}
func (l *LocalNodeInfoResponseBody) StableMarshal(buf []byte) ([]byte, error) {
if l == nil {
return []byte{}, nil
}
if buf == nil {
buf = make([]byte, l.StableSize())
}
var (
offset, n int
err error
)
n, err = proto.NestedStructureMarshal(versionInfoResponseBodyField, buf[offset:], l.version)
if err != nil {
return nil, err
}
offset += n
_, err = proto.NestedStructureMarshal(nodeInfoResponseBodyField, buf[offset:], l.nodeInfo)
if err != nil {
return nil, err
}
return buf, nil
}
func (l *LocalNodeInfoResponseBody) StableSize() (size int) {
if l == nil {
return 0
}
size += proto.NestedStructureSize(versionInfoResponseBodyField, l.version)
size += proto.NestedStructureSize(nodeInfoResponseBodyField, l.nodeInfo)
return size
}

View file

@ -6,6 +6,7 @@ import (
"github.com/nspcc-dev/neofs-api-go/v2/netmap"
grpc "github.com/nspcc-dev/neofs-api-go/v2/netmap/grpc"
"github.com/nspcc-dev/neofs-api-go/v2/refs"
"github.com/stretchr/testify/require"
)
@ -89,7 +90,7 @@ func TestReplica_StableMarshal(t *testing.T) {
})
}
func TestPlacementPolicy_StableSize(t *testing.T) {
func TestPlacementPolicy_StableMarshal(t *testing.T) {
from := generatePolicy(3)
transport := new(grpc.PlacementPolicy)
@ -105,6 +106,22 @@ func TestPlacementPolicy_StableSize(t *testing.T) {
})
}
func TestLocalNodeInfoResponseBody_StableMarshal(t *testing.T) {
from := generateNodeInfoResponseBody()
transport := new(grpc.LocalNodeInfoResponse_Body)
t.Run("non empty", func(t *testing.T) {
wire, err := from.StableMarshal(nil)
require.NoError(t, err)
err = transport.Unmarshal(wire)
require.NoError(t, err)
to := netmap.LocalNodeInfoResponseBodyFromGRPCMessage(transport)
require.Equal(t, from, to)
})
}
func generateAttribute(k, v string) *netmap.Attribute {
attr := new(netmap.Attribute)
attr.SetKey(k)
@ -189,3 +206,21 @@ func generatePolicy(n int) *netmap.PlacementPolicy {
return p
}
func generateNodeInfoResponseBody() *netmap.LocalNodeInfoResponseBody {
ni := generateNodeInfo("key", "/multi/addr", 2)
r := new(netmap.LocalNodeInfoResponseBody)
r.SetVersion(generateVersion(2, 1))
r.SetNodeInfo(ni)
return r
}
func generateVersion(maj, min uint32) *refs.Version {
version := new(refs.Version)
version.SetMajor(maj)
version.SetMinor(min)
return version
}

27
v2/netmap/service.go Normal file
View file

@ -0,0 +1,27 @@
package netmap
import (
"context"
"github.com/nspcc-dev/neofs-api-go/v2/session"
)
type Service interface {
LocalNodeInfo(ctx context.Context, request *LocalNodeInfoRequest) (*LocalNodeInfoResponse, error)
}
type LocalNodeInfoRequest struct {
body *LocalNodeInfoRequestBody
metaHeader *session.RequestMetaHeader
verifyHeader *session.RequestVerificationHeader
}
type LocalNodeInfoResponse struct {
body *LocalNodeInfoResponseBody
metaHeader *session.ResponseMetaHeader
verifyHeader *session.ResponseVerificationHeader
}

View file

@ -1,5 +1,10 @@
package netmap
import (
"github.com/nspcc-dev/neofs-api-go/v2/refs"
"github.com/nspcc-dev/neofs-api-go/v2/session"
)
type Filter struct {
name string
key string
@ -51,6 +56,13 @@ type NodeState uint32
// Clause of placement selector.
type Clause uint32
type LocalNodeInfoRequestBody struct{}
type LocalNodeInfoResponseBody struct {
version *refs.Version
nodeInfo *NodeInfo
}
const (
UnspecifiedState NodeState = iota
Online
@ -389,3 +401,109 @@ func (ni *NodeInfo) SetState(state NodeState) {
ni.state = state
}
}
func (l *LocalNodeInfoResponseBody) GetVersion() *refs.Version {
if l != nil {
return l.version
}
return nil
}
func (l *LocalNodeInfoResponseBody) SetVersion(version *refs.Version) {
if l != nil {
l.version = version
}
}
func (l *LocalNodeInfoResponseBody) GetNodeInfo() *NodeInfo {
if l != nil {
return l.nodeInfo
}
return nil
}
func (l *LocalNodeInfoResponseBody) SetNodeInfo(nodeInfo *NodeInfo) {
if l != nil {
l.nodeInfo = nodeInfo
}
}
func (l *LocalNodeInfoRequest) GetBody() *LocalNodeInfoRequestBody {
if l != nil {
return l.body
}
return nil
}
func (l *LocalNodeInfoRequest) SetBody(body *LocalNodeInfoRequestBody) {
if l != nil {
l.body = body
}
}
func (l *LocalNodeInfoRequest) GetMetaHeader() *session.RequestMetaHeader {
if l != nil {
return l.metaHeader
}
return nil
}
func (l *LocalNodeInfoRequest) SetMetaHeader(metaHeader *session.RequestMetaHeader) {
if l != nil {
l.metaHeader = metaHeader
}
}
func (l *LocalNodeInfoRequest) GetVerificationHeader() *session.RequestVerificationHeader {
if l != nil {
return l.verifyHeader
}
return nil
}
func (l *LocalNodeInfoRequest) SetVerificationHeader(verifyHeader *session.RequestVerificationHeader) {
if l != nil {
l.verifyHeader = verifyHeader
}
}
func (l *LocalNodeInfoResponse) GetBody() *LocalNodeInfoResponseBody {
if l != nil {
return l.body
}
return nil
}
func (l *LocalNodeInfoResponse) SetBody(body *LocalNodeInfoResponseBody) {
if l != nil {
l.body = body
}
}
func (l *LocalNodeInfoResponse) GetMetaHeader() *session.ResponseMetaHeader {
if l != nil {
return l.metaHeader
}
return nil
}
func (l *LocalNodeInfoResponse) SetMetaHeader(metaHeader *session.ResponseMetaHeader) {
if l != nil {
l.metaHeader = metaHeader
}
}
func (l *LocalNodeInfoResponse) GetVerificationHeader() *session.ResponseVerificationHeader {
if l != nil {
return l.verifyHeader
}
return nil
}
func (l *LocalNodeInfoResponse) SetVerificationHeader(verifyHeader *session.ResponseVerificationHeader) {
if l != nil {
l.verifyHeader = verifyHeader
}
}

View file

@ -7,6 +7,7 @@ import (
"github.com/nspcc-dev/neofs-api-go/util/signature"
"github.com/nspcc-dev/neofs-api-go/v2/accounting"
"github.com/nspcc-dev/neofs-api-go/v2/container"
"github.com/nspcc-dev/neofs-api-go/v2/netmap"
"github.com/nspcc-dev/neofs-api-go/v2/object"
"github.com/nspcc-dev/neofs-api-go/v2/refs"
"github.com/nspcc-dev/neofs-api-go/v2/session"
@ -359,5 +360,11 @@ func serviceMessageBody(req interface{}) stableMarshaler {
return v.GetBody()
case *object.GetRangeHashResponse:
return v.GetBody()
/* Netmap */
case *netmap.LocalNodeInfoRequest:
return v.GetBody()
case *netmap.LocalNodeInfoResponse:
return v.GetBody()
}
}