[#276] Merge repo with frostfs-api-go

Signed-off-by: Pavel Pogodaev <p.pogodaev@yadro.com>
This commit is contained in:
Pavel Pogodaev 2024-10-07 17:20:25 +03:00 committed by pogpp
parent 5361f0eceb
commit 6ce73790ea
337 changed files with 66666 additions and 283 deletions

29
api/rpc/accounting.go Normal file
View file

@ -0,0 +1,29 @@
package rpc
import (
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/accounting"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/rpc/client"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/rpc/common"
)
const serviceAccounting = serviceNamePrefix + "accounting.AccountingService"
const (
rpcAccountingBalance = "Balance"
)
// Balance executes AccountingService.Balance RPC.
func Balance(
cli *client.Client,
req *accounting.BalanceRequest,
opts ...client.CallOption,
) (*accounting.BalanceResponse, error) {
resp := new(accounting.BalanceResponse)
err := client.SendUnary(cli, common.CallMethodInfoUnary(serviceAccounting, rpcAccountingBalance), req, resp, opts...)
if err != nil {
return nil, err
}
return resp, nil
}

60
api/rpc/apemanager.go Normal file
View file

@ -0,0 +1,60 @@
package rpc
import (
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/apemanager"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/rpc/client"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/rpc/common"
)
const serviceAPEManager = frostfsServiceNamePrefix + "apemanager.APEManagerService"
const (
rpcAPEManagerAddChain = "AddChain"
rpcAPEManagerRemoveChain = "RemoveChain"
rpcAPEManagerListChains = "ListChains"
)
func AddChain(
cli *client.Client,
req *apemanager.AddChainRequest,
opts ...client.CallOption,
) (*apemanager.AddChainResponse, error) {
resp := new(apemanager.AddChainResponse)
err := client.SendUnary(cli, common.CallMethodInfoUnary(serviceAPEManager, rpcAPEManagerAddChain), req, resp, opts...)
if err != nil {
return nil, err
}
return resp, nil
}
func RemoveChain(
cli *client.Client,
req *apemanager.RemoveChainRequest,
opts ...client.CallOption,
) (*apemanager.RemoveChainResponse, error) {
resp := new(apemanager.RemoveChainResponse)
err := client.SendUnary(cli, common.CallMethodInfoUnary(serviceAPEManager, rpcAPEManagerRemoveChain), req, resp, opts...)
if err != nil {
return nil, err
}
return resp, nil
}
func ListChains(
cli *client.Client,
req *apemanager.ListChainsRequest,
opts ...client.CallOption,
) (*apemanager.ListChainsResponse, error) {
resp := new(apemanager.ListChainsResponse)
err := client.SendUnary(cli, common.CallMethodInfoUnary(serviceAPEManager, rpcAPEManagerListChains), req, resp, opts...)
if err != nil {
return nil, err
}
return resp, nil
}

View file

@ -0,0 +1,40 @@
package client
import (
"context"
"google.golang.org/grpc"
)
// CallOption is a messaging session option within Protobuf RPC.
type CallOption func(*callParameters)
type callParameters struct {
ctx context.Context // nolint:containedctx
dialer func(context.Context, grpc.ClientConnInterface) error
}
func defaultCallParameters() *callParameters {
return &callParameters{
ctx: context.Background(),
}
}
// WithContext returns option to specify call context. If provided, all network
// communications will be based on this context. Otherwise, context.Background()
// is used.
//
// Context SHOULD NOT be nil.
func WithContext(ctx context.Context) CallOption {
return func(prm *callParameters) {
prm.ctx = ctx
}
}
// WithDialer returns option to specify grpc dialer. If passed, it will be
// called after the connection is successfully created.
func WithDialer(dialer func(context.Context, grpc.ClientConnInterface) error) CallOption {
return func(prm *callParameters) {
prm.dialer = dialer
}
}

30
api/rpc/client/client.go Normal file
View file

@ -0,0 +1,30 @@
package client
import (
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/util/proto/encoding"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
)
// Client represents client for exchanging messages
// with a remote server using Protobuf RPC.
type Client struct {
cfg
}
// New creates, configures via options and returns new Client instance.
func New(opts ...Option) *Client {
var c Client
c.initDefault()
for _, opt := range opts {
opt(&c.cfg)
}
c.grpcDialOpts = append(c.grpcDialOpts, grpc.WithDefaultCallOptions(grpc.ForceCodec(encoding.ProtoCodec{})))
if c.tlsCfg != nil {
c.grpcDialOpts = append(c.grpcDialOpts, grpc.WithTransportCredentials(credentials.NewTLS(c.tlsCfg)))
}
return &c
}

24
api/rpc/client/conn.go Normal file
View file

@ -0,0 +1,24 @@
package client
import (
"io"
"google.golang.org/grpc"
)
// Conn is an interface for grpc client connection.
type Conn interface {
grpc.ClientConnInterface
io.Closer
}
// Conn returns underlying connection.
//
// Returns non-nil result after the first Init() call
// completed without a connection error.
//
// Client should not be used after Close() call
// on the connection: behavior is undefined.
func (c *Client) Conn() io.Closer {
return c.conn
}

72
api/rpc/client/connect.go Normal file
View file

@ -0,0 +1,72 @@
package client
import (
"context"
"errors"
"fmt"
"net"
"net/url"
grpcstd "google.golang.org/grpc"
)
var errInvalidEndpoint = errors.New("invalid endpoint options")
func (c *Client) openGRPCConn(ctx context.Context, dialer func(ctx context.Context, cc grpcstd.ClientConnInterface) error) error {
if c.conn != nil {
return nil
}
if c.addr == "" {
return errInvalidEndpoint
}
var err error
c.conn, err = grpcstd.NewClient(c.addr, c.grpcDialOpts...)
if err != nil {
return fmt.Errorf("gRPC new client: %w", err)
}
if dialer != nil {
ctx, cancel := context.WithTimeout(ctx, c.dialTimeout)
defer cancel()
if err := dialer(ctx, c.conn); err != nil {
_ = c.conn.Close()
return fmt.Errorf("gRPC dial: %w", err)
}
}
return nil
}
// ParseURI parses s as address and returns a host and a flag
// indicating that TLS is enabled. If multi-address is provided
// the argument is returned unchanged.
func ParseURI(s string) (string, bool, error) {
uri, err := url.ParseRequestURI(s)
if err != nil {
return s, false, nil
}
// check if passed string was parsed correctly
// URIs that do not start with a slash after the scheme are interpreted as:
// `scheme:opaque` => if `opaque` is not empty, then it is supposed that URI
// is in `host:port` format
if uri.Host == "" {
uri.Host = uri.Scheme
uri.Scheme = grpcScheme // assume GRPC by default
if uri.Opaque != "" {
uri.Host = net.JoinHostPort(uri.Host, uri.Opaque)
}
}
switch uri.Scheme {
case grpcTLSScheme, grpcScheme:
default:
return "", false, fmt.Errorf("unsupported scheme: %s", uri.Scheme)
}
return uri.Host, uri.Scheme == grpcTLSScheme, nil
}

124
api/rpc/client/flows.go Normal file
View file

@ -0,0 +1,124 @@
package client
import (
"errors"
"io"
"sync"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/rpc/common"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/rpc/message"
)
// SendUnary initializes communication session by RPC info, performs unary RPC
// and closes the session.
func SendUnary(cli *Client, info common.CallMethodInfo, req, resp message.Message, opts ...CallOption) error {
rw, err := cli.Init(info, opts...)
if err != nil {
return err
}
err = rw.WriteMessage(req)
if err != nil {
return err
}
err = rw.ReadMessage(resp)
if err != nil {
return err
}
return rw.Close()
}
// MessageWriterCloser wraps MessageWriter
// and io.Closer interfaces.
type MessageWriterCloser interface {
MessageWriter
io.Closer
}
type clientStreamWriterCloser struct {
MessageReadWriter
resp message.Message
}
func (c *clientStreamWriterCloser) Close() error {
err := c.MessageReadWriter.Close()
if err != nil {
return err
}
return c.ReadMessage(c.resp)
}
// OpenClientStream initializes communication session by RPC info, opens client-side stream
// and returns its interface.
//
// All stream writes must be performed before the closing. Close must be called once.
func OpenClientStream(cli *Client, info common.CallMethodInfo, resp message.Message, opts ...CallOption) (MessageWriterCloser, error) {
rw, err := cli.Init(info, opts...)
if err != nil {
return nil, err
}
return &clientStreamWriterCloser{
MessageReadWriter: rw,
resp: resp,
}, nil
}
// MessageReaderCloser wraps MessageReader
// and io.Closer interface.
type MessageReaderCloser interface {
MessageReader
io.Closer
}
type serverStreamReaderCloser struct {
rw MessageReadWriter
once sync.Once
req message.Message
}
func (s *serverStreamReaderCloser) ReadMessage(msg message.Message) error {
var err error
s.once.Do(func() {
err = s.rw.WriteMessage(s.req)
})
if err != nil {
return err
}
err = s.rw.ReadMessage(msg)
if !errors.Is(err, io.EOF) {
return err
}
err = s.rw.Close()
if err != nil {
return err
}
return io.EOF
}
// OpenServerStream initializes communication session by RPC info, opens server-side stream
// and returns its interface.
//
// All stream reads must be performed before the closing. Close must be called once.
func OpenServerStream(cli *Client, info common.CallMethodInfo, req message.Message, opts ...CallOption) (MessageReader, error) {
rw, err := cli.Init(info, opts...)
if err != nil {
return nil, err
}
return &serverStreamReaderCloser{
rw: rw,
req: req,
}, nil
}

69
api/rpc/client/init.go Normal file
View file

@ -0,0 +1,69 @@
package client
import (
"context"
"io"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/rpc/common"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/rpc/message"
"google.golang.org/grpc"
)
// MessageReader is an interface of the Message reader.
type MessageReader interface {
// ReadMessage reads the next Message.
//
// Returns io.EOF if there are no more messages to read.
// ReadMessage should not be called after io.EOF occasion.
ReadMessage(message.Message) error
}
// MessageWriter is an interface of the Message writer.
type MessageWriter interface {
// WriteMessage writers the next Message.
//
// WriteMessage should not be called after any error.
WriteMessage(message.Message) error
}
// MessageReadWriter is a component interface
// for transmitting raw Protobuf messages.
type MessageReadWriter interface {
MessageReader
MessageWriter
// Closes the communication session.
//
// All calls to send/receive messages must be done before closing.
io.Closer
}
// Init initiates a messaging session and returns the interface for message transmitting.
func (c *Client) Init(info common.CallMethodInfo, opts ...CallOption) (MessageReadWriter, error) {
prm := defaultCallParameters()
for _, opt := range opts {
opt(prm)
}
if err := c.openGRPCConn(prm.ctx, prm.dialer); err != nil {
return nil, err
}
ctx, cancel := context.WithCancel(prm.ctx)
stream, err := c.conn.NewStream(ctx, &grpc.StreamDesc{
StreamName: info.Name,
ServerStreams: info.ServerStream(),
ClientStreams: info.ClientStream(),
}, toMethodName(info))
if err != nil {
cancel()
return nil, err
}
return &streamWrapper{
ClientStream: stream,
cancel: cancel,
timeout: c.rwTimeout,
}, nil
}

129
api/rpc/client/options.go Normal file
View file

@ -0,0 +1,129 @@
package client
import (
"crypto/tls"
"time"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)
const (
grpcScheme = "grpc"
grpcTLSScheme = "grpcs"
)
// Option is a Client's option.
type Option func(*cfg)
type cfg struct {
addr string
dialTimeout time.Duration
rwTimeout time.Duration
tlsCfg *tls.Config
grpcDialOpts []grpc.DialOption
conn Conn
}
const (
defaultDialTimeout = 5 * time.Second
defaultRWTimeout = 1 * time.Minute
)
func (c *cfg) initDefault() {
c.dialTimeout = defaultDialTimeout
c.rwTimeout = defaultRWTimeout
c.grpcDialOpts = []grpc.DialOption{
grpc.WithTransportCredentials(insecure.NewCredentials()),
}
}
// WithNetworkAddress returns option to specify
// network address of the remote server.
//
// Ignored if WithGRPCConn is provided.
func WithNetworkAddress(v string) Option {
return func(c *cfg) {
if v != "" {
c.addr = v
}
}
}
// WithNetworkURIAddress combines WithNetworkAddress and WithTLSCfg options
// based on arguments.
//
// Do not use along with WithNetworkAddress and WithTLSCfg.
//
// Ignored if WithGRPCConn is provided.
func WithNetworkURIAddress(addr string, tlsCfg *tls.Config) []Option {
host, isTLS, err := ParseURI(addr)
if err != nil {
return nil
}
opts := make([]Option, 2)
opts[0] = WithNetworkAddress(host)
if isTLS {
if tlsCfg == nil {
tlsCfg = &tls.Config{}
}
opts[1] = WithTLSCfg(tlsCfg)
} else {
opts[1] = WithTLSCfg(nil)
}
return opts
}
// WithDialTimeout returns option to specify
// dial timeout of the remote server connection.
//
// Ignored if WithGRPCConn is provided.
func WithDialTimeout(v time.Duration) Option {
return func(c *cfg) {
if v > 0 {
c.dialTimeout = v
}
}
}
// WithRWTimeout returns option to specify timeout
// for reading and writing single gRPC message.
func WithRWTimeout(v time.Duration) Option {
return func(c *cfg) {
if v > 0 {
c.rwTimeout = v
}
}
}
// WithTLSCfg returns option to specify
// TLS configuration.
//
// Ignored if WithGRPCConn is provided.
func WithTLSCfg(v *tls.Config) Option {
return func(c *cfg) {
c.tlsCfg = v
}
}
// WithGRPCConn returns option to specify
// gRPC virtual connection.
func WithGRPCConn(v Conn) Option {
return func(c *cfg) {
if v != nil {
c.conn = v
}
}
}
// WithGRPCDialOptions returns an option to specify grpc.DialOption.
func WithGRPCDialOptions(opts []grpc.DialOption) Option {
return func(c *cfg) {
c.grpcDialOpts = append(c.grpcDialOpts, opts...)
}
}

View file

@ -0,0 +1,197 @@
package client
import (
"crypto/tls"
"testing"
"github.com/stretchr/testify/require"
)
func TestWithNetworkURIAddress(t *testing.T) {
hostPort := "frostfs.example.com:8080"
apiPort := "127.0.0.1:8080"
serverName := "testServer"
testCases := []struct {
uri string
tlsConfig *tls.Config
wantHost string
wantTLS bool
}{
{
uri: grpcScheme + "://" + hostPort,
tlsConfig: nil,
wantHost: "frostfs.example.com:8080",
wantTLS: false,
},
{
uri: grpcScheme + "://" + hostPort,
tlsConfig: &tls.Config{},
wantHost: "frostfs.example.com:8080",
wantTLS: false,
},
{
uri: grpcTLSScheme + "://" + hostPort,
tlsConfig: nil,
wantHost: "frostfs.example.com:8080",
wantTLS: true,
},
{
uri: grpcTLSScheme + "://" + hostPort,
tlsConfig: &tls.Config{ServerName: serverName},
wantHost: "frostfs.example.com:8080",
wantTLS: true,
},
{
uri: "wrongScheme://" + hostPort,
tlsConfig: nil,
wantHost: "",
wantTLS: false,
},
{
uri: "impossibleToParseIt",
tlsConfig: nil,
wantHost: "impossibleToParseIt",
wantTLS: false,
},
{
uri: hostPort,
tlsConfig: nil,
wantHost: hostPort,
wantTLS: false,
},
{
uri: apiPort,
tlsConfig: nil,
wantHost: apiPort,
wantTLS: false,
},
}
for _, test := range testCases {
cfg := &cfg{}
opts := WithNetworkURIAddress(test.uri, test.tlsConfig)
for _, opt := range opts {
opt(cfg)
}
require.Equal(t, test.wantHost, cfg.addr, test.uri)
require.Equal(t, test.wantTLS, cfg.tlsCfg != nil, test.uri)
// check if custom tlsConfig was applied
if test.tlsConfig != nil && test.wantTLS {
require.Equal(t, test.tlsConfig.ServerName, cfg.tlsCfg.ServerName, test.uri)
}
}
}
func Test_WithNetworkAddress_WithTLS_WithNetworkURIAddress(t *testing.T) {
addr1, addr2 := "example1.com:8080", "example2.com:8080"
testCases := []struct {
addr string
withTLS bool
uri string
wantHost string
wantTLS bool
}{
{
addr: addr1,
withTLS: true,
uri: grpcScheme + "://" + addr2,
wantHost: addr2,
wantTLS: false,
},
{
addr: addr1,
withTLS: false,
uri: grpcTLSScheme + "://" + addr2,
wantHost: addr2,
wantTLS: true,
},
}
for _, test := range testCases {
// order:
// 1. WithNetworkAddress
// 2. WithTLSCfg(if test.withTLS == true)
// 3. WithNetworkURIAddress
config := &cfg{}
opts := []Option{WithNetworkAddress(test.addr)}
if test.withTLS {
opts = append(opts, WithTLSCfg(&tls.Config{}))
}
opts = append(opts, WithNetworkURIAddress(test.uri, nil)...)
for _, opt := range opts {
opt(config)
}
require.Equal(t, test.wantHost, config.addr, test.addr)
require.Equal(t, test.wantTLS, config.tlsCfg != nil, test.addr)
}
}
func Test_WithNetworkURIAddress_WithTLS_WithNetworkAddress(t *testing.T) {
addr1, addr2 := "example1.com:8080", "example2.com:8080"
testCases := []struct {
addr string
withTLS bool
uri string
wantHost string
wantTLS bool
}{
{
uri: grpcScheme + "://" + addr1,
addr: addr2,
withTLS: true,
wantHost: addr2,
wantTLS: true,
},
{
uri: grpcTLSScheme + "://" + addr1,
addr: addr2,
withTLS: false,
wantHost: addr2,
wantTLS: true,
},
}
for _, test := range testCases {
// order:
// 1. WithNetworkURIAddress
// 2. WithNetworkAddress
// 3. WithTLSCfg(if test.withTLS == true)
config := &cfg{}
opts := WithNetworkURIAddress(test.uri, nil)
opts = append(opts, WithNetworkAddress(test.addr))
if test.withTLS {
opts = append(opts, WithTLSCfg(&tls.Config{}))
}
for _, opt := range opts {
opt(config)
}
require.Equal(t, test.wantHost, config.addr, test.uri)
require.Equal(t, test.wantTLS, config.tlsCfg != nil, test.uri)
}
}

View file

@ -0,0 +1,58 @@
package client
import (
"context"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/rpc/message"
"google.golang.org/grpc"
)
type streamWrapper struct {
grpc.ClientStream
timeout time.Duration
cancel context.CancelFunc
}
func (w streamWrapper) ReadMessage(m message.Message) error {
// Can be optimized: we can create blank message here.
gm := m.ToGRPCMessage()
err := w.withTimeout(func() error {
return w.ClientStream.RecvMsg(gm)
})
if err != nil {
return err
}
return m.FromGRPCMessage(gm)
}
func (w streamWrapper) WriteMessage(m message.Message) error {
return w.withTimeout(func() error {
return w.ClientStream.SendMsg(m.ToGRPCMessage())
})
}
func (w *streamWrapper) Close() error {
return w.withTimeout(w.ClientStream.CloseSend)
}
func (w *streamWrapper) withTimeout(closure func() error) error {
ch := make(chan error, 1)
go func() {
ch <- closure()
close(ch)
}()
tt := time.NewTimer(w.timeout)
select {
case err := <-ch:
tt.Stop()
return err
case <-tt.C:
w.cancel()
return context.DeadlineExceeded
}
}

13
api/rpc/client/util.go Normal file
View file

@ -0,0 +1,13 @@
package client
import (
"fmt"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/rpc/common"
)
const methodNameFmt = "/%s/%s"
func toMethodName(p common.CallMethodInfo) string {
return fmt.Sprintf(methodNameFmt, p.Service, p.Name)
}

10
api/rpc/common.go Normal file
View file

@ -0,0 +1,10 @@
package rpc
const (
// serviceNamePrefix is still used in "old" services but should be
// considered as deprecated. Since new services use "frostfs" root,
// `frostfsServiceNamePrefix` must be used for their rpc interface.
serviceNamePrefix = "neo.fs.v2."
frostfsServiceNamePrefix = "frostfs.v2."
)

75
api/rpc/common/call.go Normal file
View file

@ -0,0 +1,75 @@
package common
type callType uint8
const (
_ callType = iota
callUnary
callClientStream
callServerStream
callBidirStream
)
// CallMethodInfo is an information about the RPC.
type CallMethodInfo struct {
// Name of the service.
Service string
// Name of the RPC.
Name string
t callType
}
// ServerStream checks if CallMethodInfo contains
// information about the server-side streaming RPC.
func (c CallMethodInfo) ServerStream() bool {
return c.t == callServerStream || c.t == callBidirStream
}
// ClientStream checks if CallMethodInfo contains
// information about the client-side streaming RPC.
func (c CallMethodInfo) ClientStream() bool {
return c.t == callClientStream || c.t == callBidirStream
}
func (c *CallMethodInfo) setCommon(service, name string) {
c.Service = service
c.Name = name
}
// CallMethodInfoUnary returns CallMethodInfo structure
// initialized for the unary RPC.
func CallMethodInfoUnary(service, name string) (info CallMethodInfo) {
info.setCommon(service, name)
info.t = callUnary
return
}
// CallMethodInfoClientStream returns CallMethodInfo structure
// initialized for the client-side streaming RPC.
func CallMethodInfoClientStream(service, name string) (info CallMethodInfo) {
info.setCommon(service, name)
info.t = callClientStream
return
}
// CallMethodInfoServerStream returns CallMethodInfo structure
// initialized for the server-side streaming RPC.
func CallMethodInfoServerStream(service, name string) (info CallMethodInfo) {
info.setCommon(service, name)
info.t = callServerStream
return
}
// CallMethodInfoBidirectionalStream returns CallMethodInfo structure
// initialized for the bidirectional streaming RPC.
func CallMethodInfoBidirectionalStream(service, name string) (info CallMethodInfo) {
info.setCommon(service, name)
info.t = callBidirStream
return
}

View file

@ -0,0 +1,49 @@
package common_test
import (
"testing"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/rpc/common"
"github.com/stretchr/testify/require"
)
const (
testServiceName = "test service"
testRPCName = "test RPC"
)
func TestCallMethodInfoUnary(t *testing.T) {
i := common.CallMethodInfoUnary(testServiceName, testRPCName)
require.Equal(t, testServiceName, i.Service)
require.Equal(t, testRPCName, i.Name)
require.False(t, i.ClientStream())
require.False(t, i.ServerStream())
}
func TestCallMethodInfoServerStream(t *testing.T) {
i := common.CallMethodInfoServerStream(testServiceName, testRPCName)
require.Equal(t, testServiceName, i.Service)
require.Equal(t, testRPCName, i.Name)
require.False(t, i.ClientStream())
require.True(t, i.ServerStream())
}
func TestCallMethodInfoClientStream(t *testing.T) {
i := common.CallMethodInfoClientStream(testServiceName, testRPCName)
require.Equal(t, testServiceName, i.Service)
require.Equal(t, testRPCName, i.Name)
require.True(t, i.ClientStream())
require.False(t, i.ServerStream())
}
func TestCallMethodInfoBidirectionalStream(t *testing.T) {
i := common.CallMethodInfoBidirectionalStream(testServiceName, testRPCName)
require.Equal(t, testServiceName, i.Service)
require.Equal(t, testRPCName, i.Name)
require.True(t, i.ClientStream())
require.True(t, i.ServerStream())
}

82
api/rpc/container.go Normal file
View file

@ -0,0 +1,82 @@
package rpc
import (
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/container"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/rpc/client"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/rpc/common"
)
const serviceContainer = serviceNamePrefix + "container.ContainerService"
const (
rpcContainerPut = "Put"
rpcContainerGet = "Get"
rpcContainerDel = "Delete"
rpcContainerList = "List"
rpcContainerGetEACL = "GetExtendedACL"
rpcContainerUsedSpace = "AnnounceUsedSpace"
)
// PutContainer executes ContainerService.Put RPC.
func PutContainer(
cli *client.Client,
req *container.PutRequest,
opts ...client.CallOption,
) (*container.PutResponse, error) {
resp := new(container.PutResponse)
err := client.SendUnary(cli, common.CallMethodInfoUnary(serviceContainer, rpcContainerPut), req, resp, opts...)
if err != nil {
return nil, err
}
return resp, nil
}
// GetContainer executes ContainerService.Get RPC.
func GetContainer(
cli *client.Client,
req *container.GetRequest,
opts ...client.CallOption,
) (*container.GetResponse, error) {
resp := new(container.GetResponse)
err := client.SendUnary(cli, common.CallMethodInfoUnary(serviceContainer, rpcContainerGet), req, resp, opts...)
if err != nil {
return nil, err
}
return resp, nil
}
// DeleteContainer executes ContainerService.Delete RPC.
func DeleteContainer(
cli *client.Client,
req *container.DeleteRequest,
opts ...client.CallOption,
) (*container.PutResponse, error) {
resp := new(container.PutResponse)
err := client.SendUnary(cli, common.CallMethodInfoUnary(serviceContainer, rpcContainerDel), req, resp, opts...)
if err != nil {
return nil, err
}
return resp, nil
}
// ListContainers executes ContainerService.List RPC.
func ListContainers(
cli *client.Client,
req *container.ListRequest,
opts ...client.CallOption,
) (*container.ListResponse, error) {
resp := new(container.ListResponse)
err := client.SendUnary(cli, common.CallMethodInfoUnary(serviceContainer, rpcContainerList), req, resp, opts...)
if err != nil {
return nil, err
}
return resp, nil
}

4
api/rpc/grpc/init.go Normal file
View file

@ -0,0 +1,4 @@
package grpc
// Message represents raw gRPC message.
type Message any

View file

@ -0,0 +1,40 @@
package message
import (
"encoding/json"
)
// GRPCConvertedMessage is an interface
// of the gRPC message that is used
// for Message encoding/decoding.
type GRPCConvertedMessage interface {
UnmarshalProtobuf([]byte) error
}
// Unmarshal decodes m from its Protobuf binary representation
// via related gRPC message.
//
// gm should be tof the same type as the m.ToGRPCMessage() return.
func Unmarshal(m Message, data []byte, gm GRPCConvertedMessage) error {
if err := gm.UnmarshalProtobuf(data); err != nil {
return err
}
return m.FromGRPCMessage(gm)
}
// MarshalJSON encodes m to Protobuf JSON representation.
func MarshalJSON(m Message) ([]byte, error) {
return json.Marshal(m.ToGRPCMessage())
}
// UnmarshalJSON decodes m from its Protobuf JSON representation
// via related gRPC message.
//
// gm should be tof the same type as the m.ToGRPCMessage() return.
func UnmarshalJSON(m Message, data []byte, gm any) error {
if err := json.Unmarshal(data, gm); err != nil {
return err
}
return m.FromGRPCMessage(gm)
}

View file

@ -0,0 +1,43 @@
package message
import (
"fmt"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/rpc/grpc"
)
// Message represents raw Protobuf message
// that can be transmitted via several
// transport protocols.
type Message interface {
// Must return gRPC message that can
// be used for gRPC protocol transmission.
ToGRPCMessage() grpc.Message
// Must restore the message from related
// gRPC message.
//
// If gRPC message is not a related one,
// ErrUnexpectedMessageType can be returned
// to indicate this.
FromGRPCMessage(grpc.Message) error
}
// ErrUnexpectedMessageType is an error that
// is used to indicate message mismatch.
type ErrUnexpectedMessageType struct {
exp, act any
}
// NewUnexpectedMessageType initializes an error about message mismatch
// between act and exp.
func NewUnexpectedMessageType(act, exp any) ErrUnexpectedMessageType {
return ErrUnexpectedMessageType{
exp: exp,
act: act,
}
}
func (e ErrUnexpectedMessageType) Error() string {
return fmt.Sprintf("unexpected message type %T: expected %T", e.act, e.exp)
}

View file

@ -0,0 +1,126 @@
package messagetest
import (
"encoding/json"
"errors"
"fmt"
"testing"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/rpc/message"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/util/proto/encoding"
"github.com/stretchr/testify/require"
)
type jsonMessage interface {
json.Marshaler
json.Unmarshaler
}
type binaryMessage interface {
StableMarshal([]byte) []byte
StableSize() int
Unmarshal([]byte) error
}
func TestRPCMessage(t *testing.T, msgGens ...func(empty bool) message.Message) {
for _, msgGen := range msgGens {
msg := msgGen(false)
t.Run(fmt.Sprintf("convert_%T", msg), func(t *testing.T) {
msg := msgGen(false)
err := msg.FromGRPCMessage(100)
require.True(t, errors.As(err, new(message.ErrUnexpectedMessageType)))
msg2 := msgGen(true)
err = msg2.FromGRPCMessage(msg.ToGRPCMessage())
require.NoError(t, err)
require.Equal(t, msg, msg2)
})
t.Run("encoding", func(t *testing.T) {
if jm, ok := msg.(jsonMessage); ok {
t.Run(fmt.Sprintf("JSON_%T", msg), func(t *testing.T) {
data, err := jm.MarshalJSON()
require.NoError(t, err)
jm2 := msgGen(true).(jsonMessage)
require.NoError(t, jm2.UnmarshalJSON(data))
require.Equal(t, jm, jm2)
})
}
if bm, ok := msg.(binaryMessage); ok {
t.Run(fmt.Sprintf("%T.StableSize() does no allocations", bm), func(t *testing.T) {
require.Zero(t, testing.AllocsPerRun(1000, func() {
_ = bm.StableSize()
}))
})
t.Run(fmt.Sprintf("Binary_%T", msg), func(t *testing.T) {
data := bm.StableMarshal(nil)
bm2 := msgGen(true).(binaryMessage)
require.NoError(t, bm2.Unmarshal(data))
require.Equal(t, bm, bm2)
})
}
t.Run("compatibility", func(t *testing.T) {
testCompatibility(t, msgGen)
})
})
}
}
func testCompatibility(t *testing.T, msgGen func(empty bool) message.Message) {
compareBinary := func(t *testing.T, msg message.Message) {
am, ok := msg.(binaryMessage)
if !ok {
t.Skip()
}
a := am.StableMarshal(nil)
b := msg.ToGRPCMessage().(encoding.ProtoMarshaler).MarshalProtobuf(nil)
if len(a) == 0 {
require.Empty(t, b)
} else {
require.Equal(t, a, b)
}
}
compareJSON := func(t *testing.T, msg message.Message) {
am, ok := msg.(jsonMessage)
if !ok {
t.Skip()
}
a, err := am.MarshalJSON()
require.NoError(t, err)
b, err := json.Marshal(msg.ToGRPCMessage())
require.NoError(t, err)
require.JSONEq(t, string(a), string(b))
}
t.Run("empty", func(t *testing.T) {
msg := msgGen(true)
t.Run(fmt.Sprintf("Binary_%T", msg), func(t *testing.T) {
compareBinary(t, msg)
})
t.Run(fmt.Sprintf("JSON_%T", msg), func(t *testing.T) {
compareJSON(t, msg)
})
})
t.Run("not empty", func(t *testing.T) {
msg := msgGen(false)
t.Run(fmt.Sprintf("Binary_%T", msg), func(t *testing.T) {
compareBinary(t, msg)
})
t.Run(fmt.Sprintf("JSON_%T", msg), func(t *testing.T) {
compareJSON(t, msg)
})
})
}

62
api/rpc/netmap.go Normal file
View file

@ -0,0 +1,62 @@
package rpc
import (
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/netmap"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/rpc/client"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/rpc/common"
)
const serviceNetmap = serviceNamePrefix + "netmap.NetmapService"
const (
rpcNetmapNodeInfo = "LocalNodeInfo"
rpcNetmapNetInfo = "NetworkInfo"
rpcNetmapSnapshot = "NetmapSnapshot"
)
// LocalNodeInfo executes NetmapService.LocalNodeInfo RPC.
func LocalNodeInfo(
cli *client.Client,
req *netmap.LocalNodeInfoRequest,
opts ...client.CallOption,
) (*netmap.LocalNodeInfoResponse, error) {
resp := new(netmap.LocalNodeInfoResponse)
err := client.SendUnary(cli, common.CallMethodInfoUnary(serviceNetmap, rpcNetmapNodeInfo), req, resp, opts...)
if err != nil {
return nil, err
}
return resp, nil
}
// NetworkInfo executes NetmapService.NetworkInfo RPC.
func NetworkInfo(
cli *client.Client,
req *netmap.NetworkInfoRequest,
opts ...client.CallOption,
) (*netmap.NetworkInfoResponse, error) {
resp := new(netmap.NetworkInfoResponse)
err := client.SendUnary(cli, common.CallMethodInfoUnary(serviceNetmap, rpcNetmapNetInfo), req, resp, opts...)
if err != nil {
return nil, err
}
return resp, nil
}
// NetMapSnapshot executes NetmapService.NetmapSnapshot RPC.
func NetMapSnapshot(
cli *client.Client,
req *netmap.SnapshotRequest,
opts ...client.CallOption,
) (*netmap.SnapshotResponse, error) {
resp := new(netmap.SnapshotResponse)
err := client.SendUnary(cli, common.CallMethodInfoUnary(serviceNetmap, rpcNetmapSnapshot), req, resp, opts...)
if err != nil {
return nil, err
}
return resp, nil
}

243
api/rpc/object.go Normal file
View file

@ -0,0 +1,243 @@
package rpc
import (
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/object"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/rpc/client"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/rpc/common"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/rpc/message"
)
const serviceObject = serviceNamePrefix + "object.ObjectService"
const (
rpcObjectPut = "Put"
rpcObjectGet = "Get"
rpcObjectSearch = "Search"
rpcObjectRange = "GetRange"
rpcObjectHash = "GetRangeHash"
rpcObjectHead = "Head"
rpcObjectDelete = "Delete"
rpcObjectPutSingle = "PutSingle"
rpcObjectPatch = "Patch"
)
// PutRequestWriter is an object.PutRequest
// message streaming component.
type PutRequestWriter struct {
wc client.MessageWriterCloser
resp message.Message
}
// Write writes req to the stream.
func (w *PutRequestWriter) Write(req *object.PutRequest) error {
return w.wc.WriteMessage(req)
}
// Close closes the stream.
func (w *PutRequestWriter) Close() error {
return w.wc.Close()
}
// PutObject executes ObjectService.Put RPC.
func PutObject(
cli *client.Client,
resp *object.PutResponse,
opts ...client.CallOption,
) (*PutRequestWriter, error) {
wc, err := client.OpenClientStream(cli, common.CallMethodInfoClientStream(serviceObject, rpcObjectPut), resp, opts...)
if err != nil {
return nil, err
}
return &PutRequestWriter{
wc: wc,
resp: resp,
}, nil
}
// GetResponseReader is an object.GetResponse
// stream reader.
type GetResponseReader struct {
r client.MessageReader
}
// Read reads response from the stream.
//
// Returns io.EOF of streaming is finished.
func (r *GetResponseReader) Read(resp *object.GetResponse) error {
return r.r.ReadMessage(resp)
}
// GetObject executes ObjectService.Get RPC.
func GetObject(
cli *client.Client,
req *object.GetRequest,
opts ...client.CallOption,
) (*GetResponseReader, error) {
wc, err := client.OpenServerStream(cli, common.CallMethodInfoServerStream(serviceObject, rpcObjectGet), req, opts...)
if err != nil {
return nil, err
}
return &GetResponseReader{
r: wc,
}, nil
}
// GetResponseReader is an object.SearchResponse
// stream reader.
type SearchResponseReader struct {
r client.MessageReader
}
// Read reads response from the stream.
//
// Returns io.EOF of streaming is finished.
func (r *SearchResponseReader) Read(resp *object.SearchResponse) error {
return r.r.ReadMessage(resp)
}
// SearchObjects executes ObjectService.Search RPC.
func SearchObjects(
cli *client.Client,
req *object.SearchRequest,
opts ...client.CallOption,
) (*SearchResponseReader, error) {
wc, err := client.OpenServerStream(cli, common.CallMethodInfoServerStream(serviceObject, rpcObjectSearch), req, opts...)
if err != nil {
return nil, err
}
return &SearchResponseReader{
r: wc,
}, nil
}
// GetResponseReader is an object.GetRangeResponse
// stream reader.
type ObjectRangeResponseReader struct {
r client.MessageReader
}
// Read reads response from the stream.
//
// Returns io.EOF of streaming is finished.
func (r *ObjectRangeResponseReader) Read(resp *object.GetRangeResponse) error {
return r.r.ReadMessage(resp)
}
// GetObjectRange executes ObjectService.GetRange RPC.
func GetObjectRange(
cli *client.Client,
req *object.GetRangeRequest,
opts ...client.CallOption,
) (*ObjectRangeResponseReader, error) {
wc, err := client.OpenServerStream(cli, common.CallMethodInfoServerStream(serviceObject, rpcObjectRange), req, opts...)
if err != nil {
return nil, err
}
return &ObjectRangeResponseReader{
r: wc,
}, nil
}
// HeadObject executes ObjectService.Head RPC.
func HeadObject(
cli *client.Client,
req *object.HeadRequest,
opts ...client.CallOption,
) (*object.HeadResponse, error) {
resp := new(object.HeadResponse)
err := client.SendUnary(cli, common.CallMethodInfoUnary(serviceObject, rpcObjectHead), req, resp, opts...)
if err != nil {
return nil, err
}
return resp, nil
}
// DeleteObject executes ObjectService.Delete RPC.
func DeleteObject(
cli *client.Client,
req *object.DeleteRequest,
opts ...client.CallOption,
) (*object.DeleteResponse, error) {
resp := new(object.DeleteResponse)
err := client.SendUnary(cli, common.CallMethodInfoUnary(serviceObject, rpcObjectDelete), req, resp, opts...)
if err != nil {
return nil, err
}
return resp, nil
}
// HashObjectRange executes ObjectService.GetRangeHash RPC.
func HashObjectRange(
cli *client.Client,
req *object.GetRangeHashRequest,
opts ...client.CallOption,
) (*object.GetRangeHashResponse, error) {
resp := new(object.GetRangeHashResponse)
err := client.SendUnary(cli, common.CallMethodInfoUnary(serviceObject, rpcObjectHash), req, resp, opts...)
if err != nil {
return nil, err
}
return resp, nil
}
// PutSingleObject executes ObjectService.PutSingle RPC.
func PutSingleObject(
cli *client.Client,
req *object.PutSingleRequest,
opts ...client.CallOption,
) (*object.PutSingleResponse, error) {
resp := new(object.PutSingleResponse)
err := client.SendUnary(cli, common.CallMethodInfoUnary(serviceObject, rpcObjectPutSingle), req, resp, opts...)
if err != nil {
return nil, err
}
return resp, nil
}
// PatchRequestWriter is an object.PatchRequest
// message streaming component.
type PatchRequestWriter struct {
wc client.MessageWriterCloser
resp message.Message
}
// Write writes req to the stream.
func (w *PatchRequestWriter) Write(req *object.PatchRequest) error {
return w.wc.WriteMessage(req)
}
// Close closes the stream.
func (w *PatchRequestWriter) Close() error {
return w.wc.Close()
}
// Patch executes ObjectService.Patch RPC.
func Patch(
cli *client.Client,
resp *object.PatchResponse,
opts ...client.CallOption,
) (*PatchRequestWriter, error) {
wc, err := client.OpenClientStream(cli, common.CallMethodInfoClientStream(serviceObject, rpcObjectPatch), resp, opts...)
if err != nil {
return nil, err
}
return &PatchRequestWriter{
wc: wc,
resp: resp,
}, nil
}

28
api/rpc/session.go Normal file
View file

@ -0,0 +1,28 @@
package rpc
import (
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/rpc/client"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/rpc/common"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/session"
)
const serviceSession = serviceNamePrefix + "session.SessionService"
const (
rpcSessionCreate = "Create"
)
func CreateSession(
cli *client.Client,
req *session.CreateRequest,
opts ...client.CallOption,
) (*session.CreateResponse, error) {
resp := new(session.CreateResponse)
err := client.SendUnary(cli, common.CallMethodInfoUnary(serviceSession, rpcSessionCreate), req, resp, opts...)
if err != nil {
return nil, err
}
return resp, nil
}