Compare commits

..

No commits in common. "master" and "master" have entirely different histories.

28 changed files with 1811 additions and 2699 deletions

View file

@ -12,20 +12,18 @@ import (
// SendUnary initializes communication session by RPC info, performs unary RPC
// and closes the session.
func SendUnary(cli *Client, info common.CallMethodInfo, req, resp message.Message, opts ...CallOption) error {
rw, err := cli.initInternal(info, opts...)
rw, err := cli.Init(info, opts...)
if err != nil {
return err
}
err = rw.WriteMessage(req)
if err != nil {
rw.cancel()
return err
}
err = rw.ReadMessage(resp)
if err != nil {
rw.cancel()
return err
}
@ -40,28 +38,18 @@ type MessageWriterCloser interface {
}
type clientStreamWriterCloser struct {
sw *streamWrapper
MessageReadWriter
resp message.Message
}
// WriteMessage implements MessageWriterCloser.
func (c *clientStreamWriterCloser) WriteMessage(m message.Message) error {
return c.sw.WriteMessage(m)
}
func (c *clientStreamWriterCloser) Close() error {
err := c.sw.closeSend()
err := c.MessageReadWriter.Close()
if err != nil {
c.sw.cancel()
return err
}
if err = c.sw.ReadMessage(c.resp); err != nil {
c.sw.cancel()
return err
}
return c.sw.Close()
return c.ReadMessage(c.resp)
}
// OpenClientStream initializes communication session by RPC info, opens client-side stream
@ -69,14 +57,14 @@ func (c *clientStreamWriterCloser) Close() error {
//
// All stream writes must be performed before the closing. Close must be called once.
func OpenClientStream(cli *Client, info common.CallMethodInfo, resp message.Message, opts ...CallOption) (MessageWriterCloser, error) {
rw, err := cli.initInternal(info, opts...)
rw, err := cli.Init(info, opts...)
if err != nil {
return nil, err
}
return &clientStreamWriterCloser{
sw: rw,
resp: resp,
MessageReadWriter: rw,
resp: resp,
}, nil
}
@ -88,7 +76,7 @@ type MessageReaderCloser interface {
}
type serverStreamReaderCloser struct {
rw *streamWrapper
rw MessageReadWriter
once sync.Once
@ -103,15 +91,11 @@ func (s *serverStreamReaderCloser) ReadMessage(msg message.Message) error {
})
if err != nil {
s.rw.cancel()
return err
}
err = s.rw.ReadMessage(msg)
if !errors.Is(err, io.EOF) {
if err != nil {
s.rw.cancel()
}
return err
}
@ -128,7 +112,7 @@ func (s *serverStreamReaderCloser) ReadMessage(msg message.Message) error {
//
// All stream reads must be performed before the closing. Close must be called once.
func OpenServerStream(cli *Client, info common.CallMethodInfo, req message.Message, opts ...CallOption) (MessageReader, error) {
rw, err := cli.initInternal(info, opts...)
rw, err := cli.Init(info, opts...)
if err != nil {
return nil, err
}

View file

@ -41,10 +41,6 @@ type MessageReadWriter interface {
// Init initiates a messaging session and returns the interface for message transmitting.
func (c *Client) Init(info common.CallMethodInfo, opts ...CallOption) (MessageReadWriter, error) {
return c.initInternal(info, opts...)
}
func (c *Client) initInternal(info common.CallMethodInfo, opts ...CallOption) (*streamWrapper, error) {
prm := defaultCallParameters()
for _, opt := range opts {
@ -65,13 +61,7 @@ func (c *Client) initInternal(info common.CallMethodInfo, opts ...CallOption) (*
// would propagate to all subsequent read/write operations on the opened stream,
// which is not desired for the stream's lifecycle management.
dialTimeoutTimer := time.NewTimer(c.dialTimeout)
defer func() {
dialTimeoutTimer.Stop()
select {
case <-dialTimeoutTimer.C:
default:
}
}()
defer dialTimeoutTimer.Stop()
type newStreamRes struct {
stream grpc.ClientStream
@ -101,7 +91,7 @@ func (c *Client) initInternal(info common.CallMethodInfo, opts ...CallOption) (*
if res.stream != nil && res.err == nil {
_ = res.stream.CloseSend()
}
return nil, context.DeadlineExceeded
return nil, context.Canceled
case res = <-newStreamCh:
}

View file

@ -38,7 +38,6 @@ func (c *cfg) initDefault() {
c.rwTimeout = defaultRWTimeout
c.grpcDialOpts = []grpc.DialOption{
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithDisableServiceConfig(),
}
}

View file

@ -34,13 +34,8 @@ func (w streamWrapper) WriteMessage(m message.Message) error {
})
}
func (w *streamWrapper) closeSend() error {
return w.withTimeout(w.ClientStream.CloseSend)
}
func (w *streamWrapper) Close() error {
w.cancel()
return nil
return w.withTimeout(w.ClientStream.CloseSend)
}
func (w *streamWrapper) withTimeout(closure func() error) error {
@ -55,10 +50,6 @@ func (w *streamWrapper) withTimeout(closure func() error) error {
select {
case err := <-ch:
tt.Stop()
select {
case <-tt.C:
default:
}
return err
case <-tt.C:
w.cancel()

View file

@ -144,9 +144,6 @@ const (
// request parameter as the client sent it incorrectly, then this code should
// be used.
CommonFail_INVALID_ARGUMENT CommonFail = 4
// [**1029**] Resource exhausted failure. If the operation cannot be performed
// due to a lack of resources.
CommonFail_RESOURCE_EXHAUSTED CommonFail = 5
)
// Enum value maps for CommonFail.
@ -157,7 +154,6 @@ var (
2: "SIGNATURE_VERIFICATION_FAIL",
3: "NODE_UNDER_MAINTENANCE",
4: "INVALID_ARGUMENT",
5: "RESOURCE_EXHAUSTED",
}
CommonFail_value = map[string]int32{
"INTERNAL": 0,
@ -165,7 +161,6 @@ var (
"SIGNATURE_VERIFICATION_FAIL": 2,
"NODE_UNDER_MAINTENANCE": 3,
"INVALID_ARGUMENT": 4,
"RESOURCE_EXHAUSTED": 5,
}
)
@ -659,7 +654,7 @@ var file_api_status_grpc_types_proto_rawDesc = []byte{
0x45, 0x43, 0x54, 0x49, 0x4f, 0x4e, 0x5f, 0x53, 0x45, 0x53, 0x53, 0x49, 0x4f, 0x4e, 0x10, 0x04,
0x12, 0x17, 0x0a, 0x13, 0x53, 0x45, 0x43, 0x54, 0x49, 0x4f, 0x4e, 0x5f, 0x41, 0x50, 0x45, 0x5f,
0x4d, 0x41, 0x4e, 0x41, 0x47, 0x45, 0x52, 0x10, 0x05, 0x2a, 0x11, 0x0a, 0x07, 0x53, 0x75, 0x63,
0x63, 0x65, 0x73, 0x73, 0x12, 0x06, 0x0a, 0x02, 0x4f, 0x4b, 0x10, 0x00, 0x2a, 0x9d, 0x01, 0x0a,
0x63, 0x65, 0x73, 0x73, 0x12, 0x06, 0x0a, 0x02, 0x4f, 0x4b, 0x10, 0x00, 0x2a, 0x85, 0x01, 0x0a,
0x0a, 0x43, 0x6f, 0x6d, 0x6d, 0x6f, 0x6e, 0x46, 0x61, 0x69, 0x6c, 0x12, 0x0c, 0x0a, 0x08, 0x49,
0x4e, 0x54, 0x45, 0x52, 0x4e, 0x41, 0x4c, 0x10, 0x00, 0x12, 0x16, 0x0a, 0x12, 0x57, 0x52, 0x4f,
0x4e, 0x47, 0x5f, 0x4d, 0x41, 0x47, 0x49, 0x43, 0x5f, 0x4e, 0x55, 0x4d, 0x42, 0x45, 0x52, 0x10,
@ -668,35 +663,34 @@ var file_api_status_grpc_types_proto_rawDesc = []byte{
0x10, 0x02, 0x12, 0x1a, 0x0a, 0x16, 0x4e, 0x4f, 0x44, 0x45, 0x5f, 0x55, 0x4e, 0x44, 0x45, 0x52,
0x5f, 0x4d, 0x41, 0x49, 0x4e, 0x54, 0x45, 0x4e, 0x41, 0x4e, 0x43, 0x45, 0x10, 0x03, 0x12, 0x14,
0x0a, 0x10, 0x49, 0x4e, 0x56, 0x41, 0x4c, 0x49, 0x44, 0x5f, 0x41, 0x52, 0x47, 0x55, 0x4d, 0x45,
0x4e, 0x54, 0x10, 0x04, 0x12, 0x16, 0x0a, 0x12, 0x52, 0x45, 0x53, 0x4f, 0x55, 0x52, 0x43, 0x45,
0x5f, 0x45, 0x58, 0x48, 0x41, 0x55, 0x53, 0x54, 0x45, 0x44, 0x10, 0x05, 0x2a, 0x88, 0x01, 0x0a,
0x06, 0x4f, 0x62, 0x6a, 0x65, 0x63, 0x74, 0x12, 0x11, 0x0a, 0x0d, 0x41, 0x43, 0x43, 0x45, 0x53,
0x53, 0x5f, 0x44, 0x45, 0x4e, 0x49, 0x45, 0x44, 0x10, 0x00, 0x12, 0x14, 0x0a, 0x10, 0x4f, 0x42,
0x4a, 0x45, 0x43, 0x54, 0x5f, 0x4e, 0x4f, 0x54, 0x5f, 0x46, 0x4f, 0x55, 0x4e, 0x44, 0x10, 0x01,
0x12, 0x0a, 0x0a, 0x06, 0x4c, 0x4f, 0x43, 0x4b, 0x45, 0x44, 0x10, 0x02, 0x12, 0x1b, 0x0a, 0x17,
0x4c, 0x4f, 0x43, 0x4b, 0x5f, 0x4e, 0x4f, 0x4e, 0x5f, 0x52, 0x45, 0x47, 0x55, 0x4c, 0x41, 0x52,
0x5f, 0x4f, 0x42, 0x4a, 0x45, 0x43, 0x54, 0x10, 0x03, 0x12, 0x1a, 0x0a, 0x16, 0x4f, 0x42, 0x4a,
0x45, 0x43, 0x54, 0x5f, 0x41, 0x4c, 0x52, 0x45, 0x41, 0x44, 0x59, 0x5f, 0x52, 0x45, 0x4d, 0x4f,
0x56, 0x45, 0x44, 0x10, 0x04, 0x12, 0x10, 0x0a, 0x0c, 0x4f, 0x55, 0x54, 0x5f, 0x4f, 0x46, 0x5f,
0x52, 0x41, 0x4e, 0x47, 0x45, 0x10, 0x05, 0x2a, 0x55, 0x0a, 0x09, 0x43, 0x6f, 0x6e, 0x74, 0x61,
0x69, 0x6e, 0x65, 0x72, 0x12, 0x17, 0x0a, 0x13, 0x43, 0x4f, 0x4e, 0x54, 0x41, 0x49, 0x4e, 0x45,
0x52, 0x5f, 0x4e, 0x4f, 0x54, 0x5f, 0x46, 0x4f, 0x55, 0x4e, 0x44, 0x10, 0x00, 0x12, 0x12, 0x0a,
0x0e, 0x45, 0x41, 0x43, 0x4c, 0x5f, 0x4e, 0x4f, 0x54, 0x5f, 0x46, 0x4f, 0x55, 0x4e, 0x44, 0x10,
0x01, 0x12, 0x1b, 0x0a, 0x17, 0x43, 0x4f, 0x4e, 0x54, 0x41, 0x49, 0x4e, 0x45, 0x52, 0x5f, 0x41,
0x43, 0x43, 0x45, 0x53, 0x53, 0x5f, 0x44, 0x45, 0x4e, 0x49, 0x45, 0x44, 0x10, 0x02, 0x2a, 0x31,
0x0a, 0x07, 0x53, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x12, 0x13, 0x0a, 0x0f, 0x54, 0x4f, 0x4b,
0x45, 0x4e, 0x5f, 0x4e, 0x4f, 0x54, 0x5f, 0x46, 0x4f, 0x55, 0x4e, 0x44, 0x10, 0x00, 0x12, 0x11,
0x0a, 0x0d, 0x54, 0x4f, 0x4b, 0x45, 0x4e, 0x5f, 0x45, 0x58, 0x50, 0x49, 0x52, 0x45, 0x44, 0x10,
0x01, 0x2a, 0x2b, 0x0a, 0x0a, 0x41, 0x50, 0x45, 0x4d, 0x61, 0x6e, 0x61, 0x67, 0x65, 0x72, 0x12,
0x1d, 0x0a, 0x19, 0x41, 0x50, 0x45, 0x5f, 0x4d, 0x41, 0x4e, 0x41, 0x47, 0x45, 0x52, 0x5f, 0x41,
0x43, 0x43, 0x45, 0x53, 0x53, 0x5f, 0x44, 0x45, 0x4e, 0x49, 0x45, 0x44, 0x10, 0x00, 0x42, 0x62,
0x5a, 0x43, 0x67, 0x69, 0x74, 0x2e, 0x66, 0x72, 0x6f, 0x73, 0x74, 0x66, 0x73, 0x2e, 0x69, 0x6e,
0x66, 0x6f, 0x2f, 0x54, 0x72, 0x75, 0x65, 0x43, 0x6c, 0x6f, 0x75, 0x64, 0x4c, 0x61, 0x62, 0x2f,
0x66, 0x72, 0x6f, 0x73, 0x74, 0x66, 0x73, 0x2d, 0x73, 0x64, 0x6b, 0x2d, 0x67, 0x6f, 0x2f, 0x61,
0x70, 0x69, 0x2f, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x2f, 0x67, 0x72, 0x70, 0x63, 0x3b, 0x73,
0x74, 0x61, 0x74, 0x75, 0x73, 0xaa, 0x02, 0x1a, 0x4e, 0x65, 0x6f, 0x2e, 0x46, 0x69, 0x6c, 0x65,
0x53, 0x74, 0x6f, 0x72, 0x61, 0x67, 0x65, 0x2e, 0x41, 0x50, 0x49, 0x2e, 0x53, 0x74, 0x61, 0x74,
0x75, 0x73, 0x62, 0x08, 0x65, 0x64, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x70, 0xe8, 0x07,
0x4e, 0x54, 0x10, 0x04, 0x2a, 0x88, 0x01, 0x0a, 0x06, 0x4f, 0x62, 0x6a, 0x65, 0x63, 0x74, 0x12,
0x11, 0x0a, 0x0d, 0x41, 0x43, 0x43, 0x45, 0x53, 0x53, 0x5f, 0x44, 0x45, 0x4e, 0x49, 0x45, 0x44,
0x10, 0x00, 0x12, 0x14, 0x0a, 0x10, 0x4f, 0x42, 0x4a, 0x45, 0x43, 0x54, 0x5f, 0x4e, 0x4f, 0x54,
0x5f, 0x46, 0x4f, 0x55, 0x4e, 0x44, 0x10, 0x01, 0x12, 0x0a, 0x0a, 0x06, 0x4c, 0x4f, 0x43, 0x4b,
0x45, 0x44, 0x10, 0x02, 0x12, 0x1b, 0x0a, 0x17, 0x4c, 0x4f, 0x43, 0x4b, 0x5f, 0x4e, 0x4f, 0x4e,
0x5f, 0x52, 0x45, 0x47, 0x55, 0x4c, 0x41, 0x52, 0x5f, 0x4f, 0x42, 0x4a, 0x45, 0x43, 0x54, 0x10,
0x03, 0x12, 0x1a, 0x0a, 0x16, 0x4f, 0x42, 0x4a, 0x45, 0x43, 0x54, 0x5f, 0x41, 0x4c, 0x52, 0x45,
0x41, 0x44, 0x59, 0x5f, 0x52, 0x45, 0x4d, 0x4f, 0x56, 0x45, 0x44, 0x10, 0x04, 0x12, 0x10, 0x0a,
0x0c, 0x4f, 0x55, 0x54, 0x5f, 0x4f, 0x46, 0x5f, 0x52, 0x41, 0x4e, 0x47, 0x45, 0x10, 0x05, 0x2a,
0x55, 0x0a, 0x09, 0x43, 0x6f, 0x6e, 0x74, 0x61, 0x69, 0x6e, 0x65, 0x72, 0x12, 0x17, 0x0a, 0x13,
0x43, 0x4f, 0x4e, 0x54, 0x41, 0x49, 0x4e, 0x45, 0x52, 0x5f, 0x4e, 0x4f, 0x54, 0x5f, 0x46, 0x4f,
0x55, 0x4e, 0x44, 0x10, 0x00, 0x12, 0x12, 0x0a, 0x0e, 0x45, 0x41, 0x43, 0x4c, 0x5f, 0x4e, 0x4f,
0x54, 0x5f, 0x46, 0x4f, 0x55, 0x4e, 0x44, 0x10, 0x01, 0x12, 0x1b, 0x0a, 0x17, 0x43, 0x4f, 0x4e,
0x54, 0x41, 0x49, 0x4e, 0x45, 0x52, 0x5f, 0x41, 0x43, 0x43, 0x45, 0x53, 0x53, 0x5f, 0x44, 0x45,
0x4e, 0x49, 0x45, 0x44, 0x10, 0x02, 0x2a, 0x31, 0x0a, 0x07, 0x53, 0x65, 0x73, 0x73, 0x69, 0x6f,
0x6e, 0x12, 0x13, 0x0a, 0x0f, 0x54, 0x4f, 0x4b, 0x45, 0x4e, 0x5f, 0x4e, 0x4f, 0x54, 0x5f, 0x46,
0x4f, 0x55, 0x4e, 0x44, 0x10, 0x00, 0x12, 0x11, 0x0a, 0x0d, 0x54, 0x4f, 0x4b, 0x45, 0x4e, 0x5f,
0x45, 0x58, 0x50, 0x49, 0x52, 0x45, 0x44, 0x10, 0x01, 0x2a, 0x2b, 0x0a, 0x0a, 0x41, 0x50, 0x45,
0x4d, 0x61, 0x6e, 0x61, 0x67, 0x65, 0x72, 0x12, 0x1d, 0x0a, 0x19, 0x41, 0x50, 0x45, 0x5f, 0x4d,
0x41, 0x4e, 0x41, 0x47, 0x45, 0x52, 0x5f, 0x41, 0x43, 0x43, 0x45, 0x53, 0x53, 0x5f, 0x44, 0x45,
0x4e, 0x49, 0x45, 0x44, 0x10, 0x00, 0x42, 0x62, 0x5a, 0x43, 0x67, 0x69, 0x74, 0x2e, 0x66, 0x72,
0x6f, 0x73, 0x74, 0x66, 0x73, 0x2e, 0x69, 0x6e, 0x66, 0x6f, 0x2f, 0x54, 0x72, 0x75, 0x65, 0x43,
0x6c, 0x6f, 0x75, 0x64, 0x4c, 0x61, 0x62, 0x2f, 0x66, 0x72, 0x6f, 0x73, 0x74, 0x66, 0x73, 0x2d,
0x73, 0x64, 0x6b, 0x2d, 0x67, 0x6f, 0x2f, 0x61, 0x70, 0x69, 0x2f, 0x73, 0x74, 0x61, 0x74, 0x75,
0x73, 0x2f, 0x67, 0x72, 0x70, 0x63, 0x3b, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0xaa, 0x02, 0x1a,
0x4e, 0x65, 0x6f, 0x2e, 0x46, 0x69, 0x6c, 0x65, 0x53, 0x74, 0x6f, 0x72, 0x61, 0x67, 0x65, 0x2e,
0x41, 0x50, 0x49, 0x2e, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x62, 0x08, 0x65, 0x64, 0x69, 0x74,
0x69, 0x6f, 0x6e, 0x73, 0x70, 0xe8, 0x07,
}
var file_api_status_grpc_types_proto_enumTypes = make([]protoimpl.EnumInfo, 7)

View file

@ -144,9 +144,6 @@ const (
// request parameter as the client sent it incorrectly, then this code should
// be used.
CommonFail_INVALID_ARGUMENT CommonFail = 4
// [**1029**] Resource exhausted failure. If the operation cannot be performed
// due to a lack of resources.
CommonFail_RESOURCE_EXHAUSTED CommonFail = 5
)
// Enum value maps for CommonFail.
@ -157,7 +154,6 @@ var (
2: "SIGNATURE_VERIFICATION_FAIL",
3: "NODE_UNDER_MAINTENANCE",
4: "INVALID_ARGUMENT",
5: "RESOURCE_EXHAUSTED",
}
CommonFail_value = map[string]int32{
"INTERNAL": 0,
@ -165,7 +161,6 @@ var (
"SIGNATURE_VERIFICATION_FAIL": 2,
"NODE_UNDER_MAINTENANCE": 3,
"INVALID_ARGUMENT": 4,
"RESOURCE_EXHAUSTED": 5,
}
)
@ -681,7 +676,7 @@ var file_api_status_grpc_types_proto_rawDesc = []byte{
0x45, 0x43, 0x54, 0x49, 0x4f, 0x4e, 0x5f, 0x53, 0x45, 0x53, 0x53, 0x49, 0x4f, 0x4e, 0x10, 0x04,
0x12, 0x17, 0x0a, 0x13, 0x53, 0x45, 0x43, 0x54, 0x49, 0x4f, 0x4e, 0x5f, 0x41, 0x50, 0x45, 0x5f,
0x4d, 0x41, 0x4e, 0x41, 0x47, 0x45, 0x52, 0x10, 0x05, 0x2a, 0x11, 0x0a, 0x07, 0x53, 0x75, 0x63,
0x63, 0x65, 0x73, 0x73, 0x12, 0x06, 0x0a, 0x02, 0x4f, 0x4b, 0x10, 0x00, 0x2a, 0x9d, 0x01, 0x0a,
0x63, 0x65, 0x73, 0x73, 0x12, 0x06, 0x0a, 0x02, 0x4f, 0x4b, 0x10, 0x00, 0x2a, 0x85, 0x01, 0x0a,
0x0a, 0x43, 0x6f, 0x6d, 0x6d, 0x6f, 0x6e, 0x46, 0x61, 0x69, 0x6c, 0x12, 0x0c, 0x0a, 0x08, 0x49,
0x4e, 0x54, 0x45, 0x52, 0x4e, 0x41, 0x4c, 0x10, 0x00, 0x12, 0x16, 0x0a, 0x12, 0x57, 0x52, 0x4f,
0x4e, 0x47, 0x5f, 0x4d, 0x41, 0x47, 0x49, 0x43, 0x5f, 0x4e, 0x55, 0x4d, 0x42, 0x45, 0x52, 0x10,
@ -690,35 +685,34 @@ var file_api_status_grpc_types_proto_rawDesc = []byte{
0x10, 0x02, 0x12, 0x1a, 0x0a, 0x16, 0x4e, 0x4f, 0x44, 0x45, 0x5f, 0x55, 0x4e, 0x44, 0x45, 0x52,
0x5f, 0x4d, 0x41, 0x49, 0x4e, 0x54, 0x45, 0x4e, 0x41, 0x4e, 0x43, 0x45, 0x10, 0x03, 0x12, 0x14,
0x0a, 0x10, 0x49, 0x4e, 0x56, 0x41, 0x4c, 0x49, 0x44, 0x5f, 0x41, 0x52, 0x47, 0x55, 0x4d, 0x45,
0x4e, 0x54, 0x10, 0x04, 0x12, 0x16, 0x0a, 0x12, 0x52, 0x45, 0x53, 0x4f, 0x55, 0x52, 0x43, 0x45,
0x5f, 0x45, 0x58, 0x48, 0x41, 0x55, 0x53, 0x54, 0x45, 0x44, 0x10, 0x05, 0x2a, 0x88, 0x01, 0x0a,
0x06, 0x4f, 0x62, 0x6a, 0x65, 0x63, 0x74, 0x12, 0x11, 0x0a, 0x0d, 0x41, 0x43, 0x43, 0x45, 0x53,
0x53, 0x5f, 0x44, 0x45, 0x4e, 0x49, 0x45, 0x44, 0x10, 0x00, 0x12, 0x14, 0x0a, 0x10, 0x4f, 0x42,
0x4a, 0x45, 0x43, 0x54, 0x5f, 0x4e, 0x4f, 0x54, 0x5f, 0x46, 0x4f, 0x55, 0x4e, 0x44, 0x10, 0x01,
0x12, 0x0a, 0x0a, 0x06, 0x4c, 0x4f, 0x43, 0x4b, 0x45, 0x44, 0x10, 0x02, 0x12, 0x1b, 0x0a, 0x17,
0x4c, 0x4f, 0x43, 0x4b, 0x5f, 0x4e, 0x4f, 0x4e, 0x5f, 0x52, 0x45, 0x47, 0x55, 0x4c, 0x41, 0x52,
0x5f, 0x4f, 0x42, 0x4a, 0x45, 0x43, 0x54, 0x10, 0x03, 0x12, 0x1a, 0x0a, 0x16, 0x4f, 0x42, 0x4a,
0x45, 0x43, 0x54, 0x5f, 0x41, 0x4c, 0x52, 0x45, 0x41, 0x44, 0x59, 0x5f, 0x52, 0x45, 0x4d, 0x4f,
0x56, 0x45, 0x44, 0x10, 0x04, 0x12, 0x10, 0x0a, 0x0c, 0x4f, 0x55, 0x54, 0x5f, 0x4f, 0x46, 0x5f,
0x52, 0x41, 0x4e, 0x47, 0x45, 0x10, 0x05, 0x2a, 0x55, 0x0a, 0x09, 0x43, 0x6f, 0x6e, 0x74, 0x61,
0x69, 0x6e, 0x65, 0x72, 0x12, 0x17, 0x0a, 0x13, 0x43, 0x4f, 0x4e, 0x54, 0x41, 0x49, 0x4e, 0x45,
0x52, 0x5f, 0x4e, 0x4f, 0x54, 0x5f, 0x46, 0x4f, 0x55, 0x4e, 0x44, 0x10, 0x00, 0x12, 0x12, 0x0a,
0x0e, 0x45, 0x41, 0x43, 0x4c, 0x5f, 0x4e, 0x4f, 0x54, 0x5f, 0x46, 0x4f, 0x55, 0x4e, 0x44, 0x10,
0x01, 0x12, 0x1b, 0x0a, 0x17, 0x43, 0x4f, 0x4e, 0x54, 0x41, 0x49, 0x4e, 0x45, 0x52, 0x5f, 0x41,
0x43, 0x43, 0x45, 0x53, 0x53, 0x5f, 0x44, 0x45, 0x4e, 0x49, 0x45, 0x44, 0x10, 0x02, 0x2a, 0x31,
0x0a, 0x07, 0x53, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x12, 0x13, 0x0a, 0x0f, 0x54, 0x4f, 0x4b,
0x45, 0x4e, 0x5f, 0x4e, 0x4f, 0x54, 0x5f, 0x46, 0x4f, 0x55, 0x4e, 0x44, 0x10, 0x00, 0x12, 0x11,
0x0a, 0x0d, 0x54, 0x4f, 0x4b, 0x45, 0x4e, 0x5f, 0x45, 0x58, 0x50, 0x49, 0x52, 0x45, 0x44, 0x10,
0x01, 0x2a, 0x2b, 0x0a, 0x0a, 0x41, 0x50, 0x45, 0x4d, 0x61, 0x6e, 0x61, 0x67, 0x65, 0x72, 0x12,
0x1d, 0x0a, 0x19, 0x41, 0x50, 0x45, 0x5f, 0x4d, 0x41, 0x4e, 0x41, 0x47, 0x45, 0x52, 0x5f, 0x41,
0x43, 0x43, 0x45, 0x53, 0x53, 0x5f, 0x44, 0x45, 0x4e, 0x49, 0x45, 0x44, 0x10, 0x00, 0x42, 0x62,
0x5a, 0x43, 0x67, 0x69, 0x74, 0x2e, 0x66, 0x72, 0x6f, 0x73, 0x74, 0x66, 0x73, 0x2e, 0x69, 0x6e,
0x66, 0x6f, 0x2f, 0x54, 0x72, 0x75, 0x65, 0x43, 0x6c, 0x6f, 0x75, 0x64, 0x4c, 0x61, 0x62, 0x2f,
0x66, 0x72, 0x6f, 0x73, 0x74, 0x66, 0x73, 0x2d, 0x73, 0x64, 0x6b, 0x2d, 0x67, 0x6f, 0x2f, 0x61,
0x70, 0x69, 0x2f, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x2f, 0x67, 0x72, 0x70, 0x63, 0x3b, 0x73,
0x74, 0x61, 0x74, 0x75, 0x73, 0xaa, 0x02, 0x1a, 0x4e, 0x65, 0x6f, 0x2e, 0x46, 0x69, 0x6c, 0x65,
0x53, 0x74, 0x6f, 0x72, 0x61, 0x67, 0x65, 0x2e, 0x41, 0x50, 0x49, 0x2e, 0x53, 0x74, 0x61, 0x74,
0x75, 0x73, 0x62, 0x08, 0x65, 0x64, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x70, 0xe8, 0x07,
0x4e, 0x54, 0x10, 0x04, 0x2a, 0x88, 0x01, 0x0a, 0x06, 0x4f, 0x62, 0x6a, 0x65, 0x63, 0x74, 0x12,
0x11, 0x0a, 0x0d, 0x41, 0x43, 0x43, 0x45, 0x53, 0x53, 0x5f, 0x44, 0x45, 0x4e, 0x49, 0x45, 0x44,
0x10, 0x00, 0x12, 0x14, 0x0a, 0x10, 0x4f, 0x42, 0x4a, 0x45, 0x43, 0x54, 0x5f, 0x4e, 0x4f, 0x54,
0x5f, 0x46, 0x4f, 0x55, 0x4e, 0x44, 0x10, 0x01, 0x12, 0x0a, 0x0a, 0x06, 0x4c, 0x4f, 0x43, 0x4b,
0x45, 0x44, 0x10, 0x02, 0x12, 0x1b, 0x0a, 0x17, 0x4c, 0x4f, 0x43, 0x4b, 0x5f, 0x4e, 0x4f, 0x4e,
0x5f, 0x52, 0x45, 0x47, 0x55, 0x4c, 0x41, 0x52, 0x5f, 0x4f, 0x42, 0x4a, 0x45, 0x43, 0x54, 0x10,
0x03, 0x12, 0x1a, 0x0a, 0x16, 0x4f, 0x42, 0x4a, 0x45, 0x43, 0x54, 0x5f, 0x41, 0x4c, 0x52, 0x45,
0x41, 0x44, 0x59, 0x5f, 0x52, 0x45, 0x4d, 0x4f, 0x56, 0x45, 0x44, 0x10, 0x04, 0x12, 0x10, 0x0a,
0x0c, 0x4f, 0x55, 0x54, 0x5f, 0x4f, 0x46, 0x5f, 0x52, 0x41, 0x4e, 0x47, 0x45, 0x10, 0x05, 0x2a,
0x55, 0x0a, 0x09, 0x43, 0x6f, 0x6e, 0x74, 0x61, 0x69, 0x6e, 0x65, 0x72, 0x12, 0x17, 0x0a, 0x13,
0x43, 0x4f, 0x4e, 0x54, 0x41, 0x49, 0x4e, 0x45, 0x52, 0x5f, 0x4e, 0x4f, 0x54, 0x5f, 0x46, 0x4f,
0x55, 0x4e, 0x44, 0x10, 0x00, 0x12, 0x12, 0x0a, 0x0e, 0x45, 0x41, 0x43, 0x4c, 0x5f, 0x4e, 0x4f,
0x54, 0x5f, 0x46, 0x4f, 0x55, 0x4e, 0x44, 0x10, 0x01, 0x12, 0x1b, 0x0a, 0x17, 0x43, 0x4f, 0x4e,
0x54, 0x41, 0x49, 0x4e, 0x45, 0x52, 0x5f, 0x41, 0x43, 0x43, 0x45, 0x53, 0x53, 0x5f, 0x44, 0x45,
0x4e, 0x49, 0x45, 0x44, 0x10, 0x02, 0x2a, 0x31, 0x0a, 0x07, 0x53, 0x65, 0x73, 0x73, 0x69, 0x6f,
0x6e, 0x12, 0x13, 0x0a, 0x0f, 0x54, 0x4f, 0x4b, 0x45, 0x4e, 0x5f, 0x4e, 0x4f, 0x54, 0x5f, 0x46,
0x4f, 0x55, 0x4e, 0x44, 0x10, 0x00, 0x12, 0x11, 0x0a, 0x0d, 0x54, 0x4f, 0x4b, 0x45, 0x4e, 0x5f,
0x45, 0x58, 0x50, 0x49, 0x52, 0x45, 0x44, 0x10, 0x01, 0x2a, 0x2b, 0x0a, 0x0a, 0x41, 0x50, 0x45,
0x4d, 0x61, 0x6e, 0x61, 0x67, 0x65, 0x72, 0x12, 0x1d, 0x0a, 0x19, 0x41, 0x50, 0x45, 0x5f, 0x4d,
0x41, 0x4e, 0x41, 0x47, 0x45, 0x52, 0x5f, 0x41, 0x43, 0x43, 0x45, 0x53, 0x53, 0x5f, 0x44, 0x45,
0x4e, 0x49, 0x45, 0x44, 0x10, 0x00, 0x42, 0x62, 0x5a, 0x43, 0x67, 0x69, 0x74, 0x2e, 0x66, 0x72,
0x6f, 0x73, 0x74, 0x66, 0x73, 0x2e, 0x69, 0x6e, 0x66, 0x6f, 0x2f, 0x54, 0x72, 0x75, 0x65, 0x43,
0x6c, 0x6f, 0x75, 0x64, 0x4c, 0x61, 0x62, 0x2f, 0x66, 0x72, 0x6f, 0x73, 0x74, 0x66, 0x73, 0x2d,
0x73, 0x64, 0x6b, 0x2d, 0x67, 0x6f, 0x2f, 0x61, 0x70, 0x69, 0x2f, 0x73, 0x74, 0x61, 0x74, 0x75,
0x73, 0x2f, 0x67, 0x72, 0x70, 0x63, 0x3b, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0xaa, 0x02, 0x1a,
0x4e, 0x65, 0x6f, 0x2e, 0x46, 0x69, 0x6c, 0x65, 0x53, 0x74, 0x6f, 0x72, 0x61, 0x67, 0x65, 0x2e,
0x41, 0x50, 0x49, 0x2e, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x62, 0x08, 0x65, 0x64, 0x69, 0x74,
0x69, 0x6f, 0x6e, 0x73, 0x70, 0xe8, 0x07,
}
var file_api_status_grpc_types_proto_enumTypes = make([]protoimpl.EnumInfo, 7)

View file

@ -65,8 +65,6 @@ const (
NodeUnderMaintenance
// InvalidArgument is a local Code value for INVALID_ARGUMENT status.
InvalidArgument
// ResourceExhausted is a local Code value for RESOURCE_EXHAUSTED status.
ResourceExhausted
)
const (

View file

@ -296,62 +296,3 @@ func (x *InvalidArgument) SetMessage(v string) {
func (x InvalidArgument) Message() string {
return x.v2.Message()
}
// ResourceExhausted is a failure status indicating that
// the operation cannot be performed due to a lack of resources.
// Instances provide Status and StatusV2 interfaces.
type ResourceExhausted struct {
v2 status.Status
}
const defaultResourceExhaustedMsg = "resource exhausted"
// Error implements the error interface.
func (x *ResourceExhausted) Error() string {
msg := x.v2.Message()
if msg == "" {
msg = defaultResourceExhaustedMsg
}
return errMessageStatusV2(
globalizeCodeV2(status.ResourceExhausted, status.GlobalizeCommonFail),
msg,
)
}
// implements local interface defined in FromStatusV2 func.
func (x *ResourceExhausted) fromStatusV2(st *status.Status) {
x.v2 = *st
}
// ToStatusV2 implements StatusV2 interface method.
// If the value was returned by FromStatusV2, returns the source message.
// Otherwise, returns message with
// - code: RESOURCE_EXHAUSTED;
// - string message: written message via SetMessage or
// "resource exhausted" as a default message;
// - details: empty.
func (x ResourceExhausted) ToStatusV2() *status.Status {
x.v2.SetCode(globalizeCodeV2(status.ResourceExhausted, status.GlobalizeCommonFail))
if x.v2.Message() == "" {
x.v2.SetMessage(defaultResourceExhaustedMsg)
}
return &x.v2
}
// SetMessage writes invalid argument failure message.
// Message should be used for debug purposes only.
//
// See also Message.
func (x *ResourceExhausted) SetMessage(v string) {
x.v2.SetMessage(v)
}
// Message returns status message. Zero status returns empty message.
// Message should be used for debug purposes only.
//
// See also SetMessage.
func (x ResourceExhausted) Message() string {
return x.v2.Message()
}

View file

@ -167,42 +167,3 @@ func TestInvalidArgument(t *testing.T) {
require.Equal(t, msg, stV2.Message())
})
}
func TestResourceExhausted(t *testing.T) {
t.Run("default", func(t *testing.T) {
var st apistatus.ResourceExhausted
require.Empty(t, st.Message())
})
t.Run("custom message", func(t *testing.T) {
var st apistatus.ResourceExhausted
msg := "some message"
st.SetMessage(msg)
stV2 := st.ToStatusV2()
require.Equal(t, msg, st.Message())
require.Equal(t, msg, stV2.Message())
})
t.Run("empty to V2", func(t *testing.T) {
var st apistatus.ResourceExhausted
stV2 := st.ToStatusV2()
require.Equal(t, "resource exhausted", stV2.Message())
})
t.Run("non-empty to V2", func(t *testing.T) {
var st apistatus.ResourceExhausted
msg := "some other msg"
st.SetMessage(msg)
stV2 := st.ToStatusV2()
require.Equal(t, msg, stV2.Message())
})
}

View file

@ -80,8 +80,6 @@ func FromStatusV2(st *status.Status) Status {
decoder = new(NodeUnderMaintenance)
case status.InvalidArgument:
decoder = new(InvalidArgument)
case status.ResourceExhausted:
decoder = new(ResourceExhausted)
}
case object.LocalizeFailStatus(&code):
switch code {

View file

@ -3,7 +3,6 @@ package cid
import (
"crypto/sha256"
"fmt"
"strings"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/refs"
"github.com/mr-tron/base58"
@ -114,9 +113,3 @@ func (id *ID) DecodeString(s string) error {
func (id ID) String() string {
return id.EncodeToString()
}
// Cmp returns an integer comparing two base58 encoded container ID lexicographically.
// The result will be 0 if id1 == id2, -1 if id1 < id2, and +1 if id1 > id2.
func (id ID) Cmp(id2 ID) int {
return strings.Compare(id.EncodeToString(), id2.EncodeToString())
}

View file

@ -3,8 +3,6 @@ package cid_test
import (
"crypto/rand"
"crypto/sha256"
"slices"
"strings"
"testing"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/refs"
@ -108,17 +106,3 @@ func TestID_Encode(t *testing.T) {
require.Equal(t, emptyID, id.EncodeToString())
})
}
func TestID_Cmp(t *testing.T) {
var arr []cid.ID
for i := 0; i < 3; i++ {
checksum := randSHA256Checksum()
arr = append(arr, cidtest.IDWithChecksum(checksum))
}
slices.SortFunc(arr, cid.ID.Cmp)
for i := 1; i < len(arr); i++ {
require.NotEqual(t, strings.Compare(arr[i-1].EncodeToString(), arr[i].EncodeToString()), 1, "array is not sorted correctly")
}
}

View file

@ -4,7 +4,6 @@ import (
"crypto/ecdsa"
"crypto/sha256"
"fmt"
"strings"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/refs"
frostfscrypto "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/crypto"
@ -168,9 +167,3 @@ func (id *ID) UnmarshalJSON(data []byte) error {
return nil
}
// Cmp returns an integer comparing two base58 encoded object ID lexicographically.
// The result will be 0 if id1 == id2, -1 if id1 < id2, and +1 if id1 > id2.
func (id ID) Cmp(id2 ID) int {
return strings.Compare(id.EncodeToString(), id2.EncodeToString())
}

View file

@ -3,9 +3,7 @@ package oid
import (
"crypto/rand"
"crypto/sha256"
"slices"
"strconv"
"strings"
"testing"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/refs"
@ -182,16 +180,3 @@ func TestID_Encode(t *testing.T) {
require.Equal(t, emptyID, id.EncodeToString())
})
}
func TestID_Cmp(t *testing.T) {
id1 := randID(t)
id2 := randID(t)
id3 := randID(t)
arr := []ID{id1, id2, id3}
slices.SortFunc(arr, ID.Cmp)
for i := 1; i < len(arr); i++ {
require.NotEqual(t, strings.Compare(arr[i-1].EncodeToString(), arr[i].EncodeToString()), 1, "array is not sorted correctly")
}
}

View file

@ -6,7 +6,6 @@ import (
"crypto/sha256"
"fmt"
objectV2 "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/object"
buffPool "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/util/pool"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/checksum"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
@ -328,11 +327,4 @@ func (s *payloadSizeLimiter) prepareFirstChild() {
s.current.SetAttributes()
// attributes will be added to parent in detachParent
// add expiration epoch to each part
for _, attr := range s.parAttrs {
if attr.Key() == objectV2.SysAttributeExpEpoch {
s.current.SetAttributes(attr)
}
}
}

File diff suppressed because it is too large Load diff

View file

@ -1,330 +0,0 @@
package pool
import (
"context"
"errors"
"fmt"
"math/rand"
"sort"
"sync"
"sync/atomic"
"time"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
)
type innerPool struct {
lock sync.RWMutex
sampler *sampler
clients []client
}
type connectionManager struct {
innerPools []*innerPool
rebalanceParams rebalanceParameters
clientBuilder clientBuilder
logger *zap.Logger
healthChecker *healthCheck
}
// newConnectionManager returns an instance of connectionManager configured according to the parameters.
//
// Before using connectionManager, you MUST call Dial.
func newConnectionManager(options InitParameters) (*connectionManager, error) {
if options.key == nil {
return nil, fmt.Errorf("missed required parameter 'Key'")
}
nodesParams, err := adjustNodeParams(options.nodeParams)
if err != nil {
return nil, err
}
manager := &connectionManager{
logger: options.logger,
rebalanceParams: rebalanceParameters{
nodesParams: nodesParams,
nodeRequestTimeout: options.healthcheckTimeout,
clientRebalanceInterval: options.clientRebalanceInterval,
sessionExpirationDuration: options.sessionExpirationDuration,
},
clientBuilder: options.clientBuilder,
}
return manager, nil
}
func (cm *connectionManager) dial(ctx context.Context) error {
inner := make([]*innerPool, len(cm.rebalanceParams.nodesParams))
var atLeastOneHealthy bool
for i, params := range cm.rebalanceParams.nodesParams {
clients := make([]client, len(params.weights))
for j, addr := range params.addresses {
clients[j] = cm.clientBuilder(addr)
if err := clients[j].dial(ctx); err != nil {
cm.log(zap.WarnLevel, "failed to build client", zap.String("address", addr), zap.Error(err))
continue
}
atLeastOneHealthy = true
}
source := rand.NewSource(time.Now().UnixNano())
sampl := newSampler(params.weights, source)
inner[i] = &innerPool{
sampler: sampl,
clients: clients,
}
}
if !atLeastOneHealthy {
return fmt.Errorf("at least one node must be healthy")
}
cm.innerPools = inner
cm.healthChecker = newHealthCheck(cm.rebalanceParams.clientRebalanceInterval)
cm.healthChecker.startRebalance(ctx, cm.rebalance)
return nil
}
func (cm *connectionManager) rebalance(ctx context.Context) {
buffers := make([][]float64, len(cm.rebalanceParams.nodesParams))
for i, params := range cm.rebalanceParams.nodesParams {
buffers[i] = make([]float64, len(params.weights))
}
cm.updateNodesHealth(ctx, buffers)
}
func (cm *connectionManager) log(level zapcore.Level, msg string, fields ...zap.Field) {
if cm.logger == nil {
return
}
cm.logger.Log(level, msg, fields...)
}
func adjustNodeParams(nodeParams []NodeParam) ([]*nodesParam, error) {
if len(nodeParams) == 0 {
return nil, errors.New("no FrostFS peers configured")
}
nodesParamsMap := make(map[int]*nodesParam)
for _, param := range nodeParams {
nodes, ok := nodesParamsMap[param.priority]
if !ok {
nodes = &nodesParam{priority: param.priority}
}
nodes.addresses = append(nodes.addresses, param.address)
nodes.weights = append(nodes.weights, param.weight)
nodesParamsMap[param.priority] = nodes
}
nodesParams := make([]*nodesParam, 0, len(nodesParamsMap))
for _, nodes := range nodesParamsMap {
nodes.weights = adjustWeights(nodes.weights)
nodesParams = append(nodesParams, nodes)
}
sort.Slice(nodesParams, func(i, j int) bool {
return nodesParams[i].priority < nodesParams[j].priority
})
return nodesParams, nil
}
func (cm *connectionManager) updateNodesHealth(ctx context.Context, buffers [][]float64) {
wg := sync.WaitGroup{}
for i, inner := range cm.innerPools {
wg.Add(1)
bufferWeights := buffers[i]
go func(i int, _ *innerPool) {
defer wg.Done()
cm.updateInnerNodesHealth(ctx, i, bufferWeights)
}(i, inner)
}
wg.Wait()
}
func (cm *connectionManager) updateInnerNodesHealth(ctx context.Context, i int, bufferWeights []float64) {
if i > len(cm.innerPools)-1 {
return
}
pool := cm.innerPools[i]
options := cm.rebalanceParams
healthyChanged := new(atomic.Bool)
wg := sync.WaitGroup{}
for j, cli := range pool.clients {
wg.Add(1)
go func(j int, cli client) {
defer wg.Done()
tctx, c := context.WithTimeout(ctx, options.nodeRequestTimeout)
defer c()
changed, err := restartIfUnhealthy(tctx, cli)
healthy := err == nil
if healthy {
bufferWeights[j] = options.nodesParams[i].weights[j]
} else {
bufferWeights[j] = 0
}
if changed {
fields := []zap.Field{zap.String("address", cli.address()), zap.Bool("healthy", healthy)}
if err != nil {
fields = append(fields, zap.String("reason", err.Error()))
}
cm.log(zap.DebugLevel, "health has changed", fields...)
healthyChanged.Store(true)
}
}(j, cli)
}
wg.Wait()
if healthyChanged.Load() {
probabilities := adjustWeights(bufferWeights)
source := rand.NewSource(time.Now().UnixNano())
pool.lock.Lock()
pool.sampler = newSampler(probabilities, source)
pool.lock.Unlock()
}
}
// restartIfUnhealthy checks healthy status of client and recreate it if status is unhealthy.
// Indicating if status was changed by this function call and returns error that caused unhealthy status.
func restartIfUnhealthy(ctx context.Context, c client) (changed bool, err error) {
defer func() {
if err != nil {
c.setUnhealthy()
} else {
c.setHealthy()
}
}()
wasHealthy := c.isHealthy()
if res, err := c.healthcheck(ctx); err == nil {
if res.Status().IsMaintenance() {
return wasHealthy, new(apistatus.NodeUnderMaintenance)
}
return !wasHealthy, nil
}
if err = c.restart(ctx); err != nil {
return wasHealthy, err
}
res, err := c.healthcheck(ctx)
if err != nil {
return wasHealthy, err
}
if res.Status().IsMaintenance() {
return wasHealthy, new(apistatus.NodeUnderMaintenance)
}
return !wasHealthy, nil
}
func adjustWeights(weights []float64) []float64 {
adjusted := make([]float64, len(weights))
sum := 0.0
for _, weight := range weights {
sum += weight
}
if sum > 0 {
for i, weight := range weights {
adjusted[i] = weight / sum
}
}
return adjusted
}
func (cm *connectionManager) connection() (client, error) {
for _, inner := range cm.innerPools {
cp, err := inner.connection()
if err == nil {
return cp, nil
}
}
return nil, errors.New("no healthy client")
}
// iterate iterates over all clients in all innerPools.
func (cm *connectionManager) iterate(cb func(client)) {
for _, inner := range cm.innerPools {
for _, cl := range inner.clients {
if cl.isHealthy() {
cb(cl)
}
}
}
}
func (p *innerPool) connection() (client, error) {
p.lock.RLock() // need lock because of using p.sampler
defer p.lock.RUnlock()
if len(p.clients) == 1 {
cp := p.clients[0]
if cp.isHealthy() {
return cp, nil
}
return nil, errors.New("no healthy client")
}
attempts := 3 * len(p.clients)
for range attempts {
i := p.sampler.Next()
if cp := p.clients[i]; cp.isHealthy() {
return cp, nil
}
}
return nil, errors.New("no healthy client")
}
func (cm connectionManager) Statistic() Statistic {
stat := Statistic{}
for _, inner := range cm.innerPools {
nodes := make([]string, 0, len(inner.clients))
for _, cl := range inner.clients {
if cl.isHealthy() {
nodes = append(nodes, cl.address())
}
node := NodeStatistic{
address: cl.address(),
methods: cl.methodsStatus(),
overallErrors: cl.overallErrorRate(),
currentErrors: cl.currentErrorRate(),
}
stat.nodes = append(stat.nodes, node)
stat.overallErrors += node.overallErrors
}
if len(stat.currentNodes) == 0 {
stat.currentNodes = nodes
}
}
return stat
}
func (cm *connectionManager) close() {
cm.healthChecker.stopRebalance()
// close all clients
for _, pools := range cm.innerPools {
for _, cli := range pools.clients {
_ = cli.close()
}
}
}

View file

@ -1,47 +0,0 @@
package pool
import (
"context"
"time"
)
type healthCheck struct {
cancel context.CancelFunc
closedCh chan struct{}
clientRebalanceInterval time.Duration
}
func newHealthCheck(clientRebalanceInterval time.Duration) *healthCheck {
var h healthCheck
h.clientRebalanceInterval = clientRebalanceInterval
h.closedCh = make(chan struct{})
return &h
}
// startRebalance runs loop to monitor connection healthy status.
func (h *healthCheck) startRebalance(ctx context.Context, callback func(ctx context.Context)) {
ctx, cancel := context.WithCancel(ctx)
h.cancel = cancel
go func() {
ticker := time.NewTicker(h.clientRebalanceInterval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
close(h.closedCh)
return
case <-ticker.C:
callback(ctx)
ticker.Reset(h.clientRebalanceInterval)
}
}
}()
}
func (h *healthCheck) stopRebalance() {
h.cancel()
<-h.closedCh
}

File diff suppressed because it is too large Load diff

View file

@ -104,7 +104,7 @@ func TestBuildPoolOneNodeFailed(t *testing.T) {
expectedAuthKey := frostfsecdsa.PublicKey(clientKeys[1].PublicKey)
condition := func() bool {
cp, err := clientPool.manager.connection()
cp, err := clientPool.connection()
if err != nil {
return false
}
@ -141,7 +141,7 @@ func TestOneNode(t *testing.T) {
require.NoError(t, err)
t.Cleanup(pool.Close)
cp, err := pool.manager.connection()
cp, err := pool.connection()
require.NoError(t, err)
st, _ := pool.cache.Get(formCacheKey(cp.address(), pool.key, false))
expectedAuthKey := frostfsecdsa.PublicKey(key1.PublicKey)
@ -171,7 +171,7 @@ func TestTwoNodes(t *testing.T) {
require.NoError(t, err)
t.Cleanup(pool.Close)
cp, err := pool.manager.connection()
cp, err := pool.connection()
require.NoError(t, err)
st, _ := pool.cache.Get(formCacheKey(cp.address(), pool.key, false))
require.True(t, assertAuthKeyForAny(st, clientKeys))
@ -220,12 +220,13 @@ func TestOneOfTwoFailed(t *testing.T) {
err = pool.Dial(context.Background())
require.NoError(t, err)
require.NoError(t, err)
t.Cleanup(pool.Close)
time.Sleep(2 * time.Second)
for range 5 {
cp, err := pool.manager.connection()
cp, err := pool.connection()
require.NoError(t, err)
st, _ := pool.cache.Get(formCacheKey(cp.address(), pool.key, false))
require.True(t, assertAuthKeyForAny(st, clientKeys))
@ -368,7 +369,7 @@ func TestUpdateNodesHealth(t *testing.T) {
tc.prepareCli(cli)
p, log := newPool(t, cli)
p.manager.updateNodesHealth(ctx, [][]float64{{1}})
p.updateNodesHealth(ctx, [][]float64{{1}})
changed := tc.wasHealthy != tc.willHealthy
require.Equalf(t, tc.willHealthy, cli.isHealthy(), "healthy status should be: %v", tc.willHealthy)
@ -384,19 +385,19 @@ func newPool(t *testing.T, cli *mockClient) (*Pool, *observer.ObservedLogs) {
require.NoError(t, err)
return &Pool{
cache: cache,
key: newPrivateKey(t),
manager: &connectionManager{
innerPools: []*innerPool{{
sampler: newSampler([]float64{1}, rand.NewSource(0)),
clients: []client{cli},
}},
healthChecker: newHealthCheck(200 * time.Millisecond),
rebalanceParams: rebalanceParameters{
nodesParams: []*nodesParam{{1, []string{"peer0"}, []float64{1}}},
nodeRequestTimeout: time.Second,
},
logger: log},
innerPools: []*innerPool{{
sampler: newSampler([]float64{1}, rand.NewSource(0)),
clients: []client{cli},
}},
cache: cache,
key: newPrivateKey(t),
closedCh: make(chan struct{}),
rebalanceParams: rebalanceParameters{
nodesParams: []*nodesParam{{1, []string{"peer0"}, []float64{1}}},
nodeRequestTimeout: time.Second,
clientRebalanceInterval: 200 * time.Millisecond,
},
logger: log,
}, observedLog
}
@ -434,7 +435,7 @@ func TestTwoFailed(t *testing.T) {
time.Sleep(2 * time.Second)
_, err = pool.manager.connection()
_, err = pool.connection()
require.Error(t, err)
require.Contains(t, err.Error(), "no healthy")
}
@ -468,7 +469,7 @@ func TestSessionCache(t *testing.T) {
t.Cleanup(pool.Close)
// cache must contain session token
cp, err := pool.manager.connection()
cp, err := pool.connection()
require.NoError(t, err)
st, _ := pool.cache.Get(formCacheKey(cp.address(), pool.key, false))
require.True(t, st.AssertAuthKey(&expectedAuthKey))
@ -481,7 +482,7 @@ func TestSessionCache(t *testing.T) {
require.Error(t, err)
// cache must not contain session token
cp, err = pool.manager.connection()
cp, err = pool.connection()
require.NoError(t, err)
_, ok := pool.cache.Get(formCacheKey(cp.address(), pool.key, false))
require.False(t, ok)
@ -493,7 +494,7 @@ func TestSessionCache(t *testing.T) {
require.NoError(t, err)
// cache must contain session token
cp, err = pool.manager.connection()
cp, err = pool.connection()
require.NoError(t, err)
st, _ = pool.cache.Get(formCacheKey(cp.address(), pool.key, false))
require.True(t, st.AssertAuthKey(&expectedAuthKey))
@ -537,7 +538,7 @@ func TestPriority(t *testing.T) {
expectedAuthKey1 := frostfsecdsa.PublicKey(clientKeys[0].PublicKey)
firstNode := func() bool {
cp, err := pool.manager.connection()
cp, err := pool.connection()
require.NoError(t, err)
st, _ := pool.cache.Get(formCacheKey(cp.address(), pool.key, false))
return st.AssertAuthKey(&expectedAuthKey1)
@ -545,7 +546,7 @@ func TestPriority(t *testing.T) {
expectedAuthKey2 := frostfsecdsa.PublicKey(clientKeys[1].PublicKey)
secondNode := func() bool {
cp, err := pool.manager.connection()
cp, err := pool.connection()
require.NoError(t, err)
st, _ := pool.cache.Get(formCacheKey(cp.address(), pool.key, false))
return st.AssertAuthKey(&expectedAuthKey2)
@ -582,7 +583,7 @@ func TestSessionCacheWithKey(t *testing.T) {
require.NoError(t, err)
// cache must contain session token
cp, err := pool.manager.connection()
cp, err := pool.connection()
require.NoError(t, err)
st, _ := pool.cache.Get(formCacheKey(cp.address(), pool.key, false))
require.True(t, st.AssertAuthKey(&expectedAuthKey))
@ -635,8 +636,9 @@ func TestSessionTokenOwner(t *testing.T) {
cc.sessionTarget = func(tok session.Object) {
tkn = tok
}
err = p.initCall(&cc, prm, prmCtx)
err = p.initCallContext(&cc, prm, prmCtx)
require.NoError(t, err)
err = p.openDefaultSession(ctx, &cc)
require.NoError(t, err)
require.True(t, tkn.VerifySignature())
@ -920,14 +922,14 @@ func TestSwitchAfterErrorThreshold(t *testing.T) {
t.Cleanup(pool.Close)
for range errorThreshold {
conn, err := pool.manager.connection()
conn, err := pool.connection()
require.NoError(t, err)
require.Equal(t, nodes[0].address, conn.address())
_, err = conn.objectGet(ctx, PrmObjectGet{})
require.Error(t, err)
}
conn, err := pool.manager.connection()
conn, err := pool.connection()
require.NoError(t, err)
require.Equal(t, nodes[1].address, conn.address())
_, err = conn.objectGet(ctx, PrmObjectGet{})

View file

@ -47,6 +47,9 @@ func TestHealthyReweight(t *testing.T) {
buffer = make([]float64, len(weights))
)
cache, err := newCache(0)
require.NoError(t, err)
client1 := newMockClient(names[0], *newPrivateKey(t))
client1.errOnDial()
@ -56,20 +59,22 @@ func TestHealthyReweight(t *testing.T) {
sampler: newSampler(weights, rand.NewSource(0)),
clients: []client{client1, client2},
}
cm := &connectionManager{
p := &Pool{
innerPools: []*innerPool{inner},
cache: cache,
key: newPrivateKey(t),
rebalanceParams: rebalanceParameters{nodesParams: []*nodesParam{{weights: weights}}},
}
// check getting first node connection before rebalance happened
connection0, err := cm.connection()
connection0, err := p.connection()
require.NoError(t, err)
mock0 := connection0.(*mockClient)
require.Equal(t, names[0], mock0.address())
cm.updateInnerNodesHealth(context.TODO(), 0, buffer)
p.updateInnerNodesHealth(context.TODO(), 0, buffer)
connection1, err := cm.connection()
connection1, err := p.connection()
require.NoError(t, err)
mock1 := connection1.(*mockClient)
require.Equal(t, names[1], mock1.address())
@ -79,10 +84,10 @@ func TestHealthyReweight(t *testing.T) {
inner.clients[0] = newMockClient(names[0], *newPrivateKey(t))
inner.lock.Unlock()
cm.updateInnerNodesHealth(context.TODO(), 0, buffer)
p.updateInnerNodesHealth(context.TODO(), 0, buffer)
inner.sampler = newSampler(weights, rand.NewSource(0))
connection0, err = cm.connection()
connection0, err = p.connection()
require.NoError(t, err)
mock0 = connection0.(*mockClient)
require.Equal(t, names[0], mock0.address())
@ -103,12 +108,12 @@ func TestHealthyNoReweight(t *testing.T) {
newMockClient(names[1], *newPrivateKey(t)),
},
}
cm := &connectionManager{
p := &Pool{
innerPools: []*innerPool{inner},
rebalanceParams: rebalanceParameters{nodesParams: []*nodesParam{{weights: weights}}},
}
cm.updateInnerNodesHealth(context.TODO(), 0, buffer)
p.updateInnerNodesHealth(context.TODO(), 0, buffer)
inner.lock.RLock()
defer inner.lock.RUnlock()

View file

@ -1,82 +0,0 @@
package tree
import (
"errors"
"sync"
"time"
)
type (
circuitBreaker struct {
breakDuration time.Duration
threshold int
mu sync.RWMutex
state map[uint64]state
}
state struct {
counter int
breakTimestamp time.Time
}
)
var ErrCBClosed = errors.New("circuit breaker is closed")
func newCircuitBreaker(breakDuration time.Duration, threshold int) *circuitBreaker {
return &circuitBreaker{
breakDuration: breakDuration,
threshold: threshold,
state: make(map[uint64]state),
}
}
func (cb *circuitBreaker) checkBreak(id uint64) error {
cb.mu.RLock()
s, ok := cb.state[id]
cb.mu.RUnlock()
if ok && time.Since(s.breakTimestamp) < cb.breakDuration {
return ErrCBClosed
}
return nil
}
func (cb *circuitBreaker) openBreak(id uint64) {
cb.mu.Lock()
defer cb.mu.Unlock()
delete(cb.state, id)
}
func (cb *circuitBreaker) incError(id uint64) {
cb.mu.Lock()
defer cb.mu.Unlock()
s := cb.state[id]
s.counter++
if s.counter >= cb.threshold {
s.counter = cb.threshold
if time.Since(s.breakTimestamp) >= cb.breakDuration {
s.breakTimestamp = time.Now()
}
}
cb.state[id] = s
}
func (cb *circuitBreaker) Do(id uint64, f func() error) error {
if err := cb.checkBreak(id); err != nil {
return err
}
err := f()
if err == nil {
cb.openBreak(id)
} else {
cb.incError(id)
}
return err
}

View file

@ -1,68 +0,0 @@
package tree
import (
"errors"
"testing"
"time"
"github.com/stretchr/testify/require"
)
func TestCircuitBreaker(t *testing.T) {
remoteErr := errors.New("service is being synchronized")
breakDuration := 1 * time.Second
threshold := 10
cb := newCircuitBreaker(breakDuration, threshold)
// Hit threshold
for i := 0; i < threshold; i++ {
err := cb.Do(1, func() error { return remoteErr })
require.ErrorIs(t, err, remoteErr)
}
// Different client should not be affected by threshold
require.NoError(t, cb.Do(2, func() error { return nil }))
// Immediate request should return circuit breaker error
require.ErrorIs(t, cb.Do(1, func() error { return nil }), ErrCBClosed)
// Request after breakDuration should be ok
time.Sleep(breakDuration)
require.NoError(t, cb.Do(1, func() error { return nil }))
// Try hitting threshold one more time after break duration
for i := 0; i < threshold; i++ {
err := cb.Do(1, func() error { return remoteErr })
require.ErrorIs(t, err, remoteErr)
}
// Immediate request should return circuit breaker error
require.ErrorIs(t, cb.Do(1, func() error { return nil }), ErrCBClosed)
}
func TestCircuitBreakerNoBlock(t *testing.T) {
remoteErr := errors.New("service is being synchronized")
funcDuration := 200 * time.Millisecond
threshold := 100
cb := newCircuitBreaker(10*funcDuration, threshold)
slowFunc := func() error {
time.Sleep(funcDuration)
return remoteErr
}
for i := 0; i < threshold; i++ {
// run in multiple goroutines Do function
go func() {
cb.Do(1, slowFunc)
}()
}
time.Sleep(funcDuration)
// eventually at most after one more func duration circuit breaker will be
// closed and not blocked by slow func execution under mutex
require.Eventually(t, func() bool {
return errors.Is(cb.Do(1, func() error { return nil }), ErrCBClosed)
}, funcDuration, funcDuration/10)
}

View file

@ -24,12 +24,10 @@ import (
)
const (
defaultRebalanceInterval = 15 * time.Second
defaultHealthcheckTimeout = 4 * time.Second
defaultDialTimeout = 5 * time.Second
defaultStreamTimeout = 10 * time.Second
defaultCircuitBreakerDuration = 10 * time.Second
defaultCircuitBreakerTreshold = 10
defaultRebalanceInterval = 15 * time.Second
defaultHealthcheckTimeout = 4 * time.Second
defaultDialTimeout = 5 * time.Second
defaultStreamTimeout = 10 * time.Second
)
// SubTreeSort defines an order of nodes returned from GetSubTree RPC.
@ -78,8 +76,6 @@ type InitParameters struct {
dialOptions []grpc.DialOption
maxRequestAttempts int
netMapInfoSource NetMapInfoSource
circuitBreakerThreshold int
circuitBreakerDuration time.Duration
}
type NetMapInfoSource interface {
@ -121,8 +117,6 @@ type Pool struct {
// * retry in case of request failure (see Pool.requestWithRetry)
// startIndices will be used if netMapInfoSource is not set
startIndices [2]int
// circuit breaker for dial operations when netmap is being used
cb *circuitBreaker
}
type innerPool struct {
@ -254,10 +248,6 @@ func NewPool(options InitParameters) (*Pool, error) {
methods: methods,
netMapInfoSource: options.netMapInfoSource,
clientMap: make(map[uint64]client),
cb: newCircuitBreaker(
options.circuitBreakerDuration,
options.circuitBreakerThreshold,
),
}
if options.netMapInfoSource == nil {
@ -376,18 +366,6 @@ func (x *InitParameters) SetNetMapInfoSource(netMapInfoSource NetMapInfoSource)
x.netMapInfoSource = netMapInfoSource
}
// SetCircuitBreakerThreshold sets number of consecutive failed connection before
// circuit is considered closed and therefore return error immediately.
func (x *InitParameters) SetCircuitBreakerThreshold(circuitBreakerThreshold int) {
x.circuitBreakerThreshold = circuitBreakerThreshold
}
// SetCircuitBreakerDuration sets duration for circuit to be considered closed.
// This effectively limits to one new connection try per duration.
func (x *InitParameters) SetCircuitBreakerDuration(circuitBreakerDuration time.Duration) {
x.circuitBreakerDuration = circuitBreakerDuration
}
// GetNodes invokes eponymous method from TreeServiceClient.
//
// Can return predefined errors:
@ -436,19 +414,12 @@ func (p *Pool) GetNodes(ctx context.Context, prm GetNodesParams) ([]*tree.GetNod
//
// Must be initialized using Pool.GetSubTree, any other usage is unsafe.
type SubTreeReader struct {
cli *rpcapi.GetSubTreeResponseReader
probe *tree.GetSubTreeResponseBody
cli *rpcapi.GetSubTreeResponseReader
}
// Read reads another list of the subtree nodes.
func (x *SubTreeReader) Read(buf []*tree.GetSubTreeResponseBody) (int, error) {
i := 0
if x.probe != nil && len(buf) != 0 {
buf[0] = x.probe
x.probe = nil
i = 1
}
for ; i < len(buf); i++ {
for i := range buf {
var resp tree.GetSubTreeResponse
err := x.cli.Read(&resp)
if err == io.EOF {
@ -465,10 +436,6 @@ func (x *SubTreeReader) Read(buf []*tree.GetSubTreeResponseBody) (int, error) {
// ReadAll reads all nodes subtree nodes.
func (x *SubTreeReader) ReadAll() ([]*tree.GetSubTreeResponseBody, error) {
var res []*tree.GetSubTreeResponseBody
if x.probe != nil {
res = append(res, x.probe)
x.probe = nil
}
for {
var resp tree.GetSubTreeResponse
err := x.cli.Read(&resp)
@ -485,12 +452,6 @@ func (x *SubTreeReader) ReadAll() ([]*tree.GetSubTreeResponseBody, error) {
// Next gets the next node from subtree.
func (x *SubTreeReader) Next() (*tree.GetSubTreeResponseBody, error) {
if x.probe != nil {
res := x.probe
x.probe = nil
return res, nil
}
var resp tree.GetSubTreeResponse
err := x.cli.Read(&resp)
if err == io.EOF {
@ -534,24 +495,16 @@ func (p *Pool) GetSubTree(ctx context.Context, prm GetSubTreeParams) (*SubTreeRe
}
var cli *rpcapi.GetSubTreeResponseReader
var probeBody *tree.GetSubTreeResponseBody
err := p.requestWithRetry(ctx, prm.CID, func(client *rpcclient.Client) (inErr error) {
cli, inErr = rpcapi.GetSubTree(client, request, rpcclient.WithContext(ctx))
if inErr != nil {
return handleError("failed to get sub tree client", inErr)
}
probe := &tree.GetSubTreeResponse{}
inErr = cli.Read(probe)
probeBody = probe.GetBody()
return handleError("failed to get first resp from sub tree client", inErr)
return handleError("failed to get sub tree client", inErr)
})
p.methods[methodGetSubTree].IncRequests(time.Since(start))
if err != nil {
return nil, err
}
return &SubTreeReader{cli: cli, probe: probeBody}, nil
return &SubTreeReader{cli: cli}, nil
}
// AddNode invokes eponymous method from TreeServiceClient.
@ -811,14 +764,6 @@ func fillDefaultInitParams(params *InitParameters) {
if params.maxRequestAttempts <= 0 {
params.maxRequestAttempts = len(params.nodeParams)
}
if params.circuitBreakerDuration <= 0 {
params.circuitBreakerDuration = defaultCircuitBreakerDuration
}
if params.circuitBreakerThreshold <= 0 {
params.circuitBreakerThreshold = defaultCircuitBreakerTreshold
}
}
func (p *Pool) log(level zapcore.Level, msg string, fields ...zap.Field) {
@ -1014,17 +959,14 @@ LOOP:
treeCl, ok := p.getClientFromMap(cnrNode.Hash())
if !ok {
err = p.cb.Do(cnrNode.Hash(), func() error {
treeCl, err = p.getNewTreeClient(ctx, cnrNode)
return err
})
treeCl, err = p.getNewTreeClient(ctx, cnrNode)
if err != nil {
finErr = finalError(finErr, err)
p.log(zap.DebugLevel, "failed to create tree client", zap.String("request_id", reqID), zap.Int("remaining attempts", attempts))
continue
}
treeCl = p.addClientToMap(cnrNode.Hash(), treeCl)
p.addClientToMap(cnrNode.Hash(), treeCl)
}
attempts--
@ -1059,24 +1001,16 @@ func (p *Pool) getClientFromMap(hash uint64) (client, bool) {
return cl, ok
}
func (p *Pool) addClientToMap(hash uint64, cl client) client {
func (p *Pool) addClientToMap(hash uint64, cl client) {
p.mutex.Lock()
defer p.mutex.Unlock()
if old, ok := p.clientMap[hash]; ok {
_ = cl.close()
return old
}
p.clientMap[hash] = cl
return cl
p.mutex.Unlock()
}
func (p *Pool) deleteClientFromMap(hash uint64) {
p.mutex.Lock()
if cli, ok := p.clientMap[hash]; ok {
_ = cli.close()
delete(p.clientMap, hash)
}
_ = p.clientMap[hash].close()
delete(p.clientMap, hash)
p.mutex.Unlock()
}
@ -1096,16 +1030,6 @@ func (p *Pool) getNewTreeClient(ctx context.Context, node netmap.NodeInfo) (*tre
newTreeCl := newTreeClient(addr.URIAddr(), p.dialOptions, p.nodeDialTimeout, p.streamTimeout)
if err = newTreeCl.dial(ctx); err != nil {
p.log(zap.WarnLevel, "failed to dial tree client", zap.String("address", addr.URIAddr()), zap.Error(err))
// We have to close connection here after failed `dial()`.
// This is NOT necessary in object pool and regular tree pool without netmap support, because:
// - object pool uses SDK object client which closes connection during `dial()` call by itself,
// - regular tree pool is going to reuse connection by calling `redialIfNecessary()`.
// Tree pool with netmap support does not operate with background goroutine, so we have to close connection immediately.
if err = newTreeCl.close(); err != nil {
p.log(zap.WarnLevel, "failed to close recently dialed tree client", zap.String("address", addr.URIAddr()), zap.Error(err))
}
return false
}

View file

@ -1,345 +0,0 @@
package tree
import (
"bytes"
"context"
"errors"
"io"
"net"
"runtime"
"strconv"
"testing"
apinetmap "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/netmap"
apitree "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/tree"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool"
tree "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool/tree/service"
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
"github.com/stretchr/testify/require"
"google.golang.org/grpc"
)
type mockTreeServer struct {
id int
srv *grpc.Server
lis net.Listener
key *keys.PrivateKey
healthy bool
addCounter int
getSubTreeError error
getSubTreeResponses []*tree.GetSubTreeResponse_Body
getSubTreeCounter int
}
type mockNetmapSource struct {
servers []*mockTreeServer
policy string
}
func (m *mockNetmapSource) NetMapSnapshot(context.Context) (netmap.NetMap, error) {
nm := netmap.NetMap{}
nodes := make([]netmap.NodeInfo, len(m.servers))
for i, server := range m.servers {
ni := apinetmap.NodeInfo{}
ni.SetAddresses(server.lis.Addr().String())
ni.SetPublicKey(server.key.PublicKey().Bytes())
err := nodes[i].ReadFromV2(ni) // no other way to set address field in netmap.NodeInfo
if err != nil {
return nm, err
}
nodes[i].SetAttribute("id", strconv.Itoa(server.id))
}
nm.SetNodes(nodes)
return nm, nil
}
func (m *mockNetmapSource) PlacementPolicy(context.Context, cid.ID) (netmap.PlacementPolicy, error) {
p := netmap.PlacementPolicy{}
return p, p.DecodeString(m.policy)
}
func (m *mockTreeServer) Serve() {
go m.srv.Serve(m.lis)
}
func (m *mockTreeServer) Stop() {
m.srv.Stop()
}
func (m *mockTreeServer) Addr() string {
return m.lis.Addr().String()
}
func (m *mockTreeServer) Add(context.Context, *tree.AddRequest) (*tree.AddResponse, error) {
m.addCounter++
return &tree.AddResponse{}, nil
}
func (m *mockTreeServer) AddByPath(context.Context, *tree.AddByPathRequest) (*tree.AddByPathResponse, error) {
panic("implement me")
}
func (m *mockTreeServer) Remove(context.Context, *tree.RemoveRequest) (*tree.RemoveResponse, error) {
panic("implement me")
}
func (m *mockTreeServer) Move(context.Context, *tree.MoveRequest) (*tree.MoveResponse, error) {
panic("implement me")
}
func (m *mockTreeServer) GetNodeByPath(context.Context, *tree.GetNodeByPathRequest) (*tree.GetNodeByPathResponse, error) {
panic("implement me")
}
func (m *mockTreeServer) GetSubTree(_ *tree.GetSubTreeRequest, s tree.TreeService_GetSubTreeServer) error {
m.getSubTreeCounter++
if m.getSubTreeError != nil {
return m.getSubTreeError
}
for i := range m.getSubTreeResponses {
if err := s.Send(&tree.GetSubTreeResponse{
Body: m.getSubTreeResponses[i],
}); err != nil {
return err
}
}
return nil
}
func (m *mockTreeServer) TreeList(context.Context, *tree.TreeListRequest) (*tree.TreeListResponse, error) {
panic("implement me")
}
func (m *mockTreeServer) Apply(context.Context, *tree.ApplyRequest) (*tree.ApplyResponse, error) {
panic("implement me")
}
func (m *mockTreeServer) GetOpLog(*tree.GetOpLogRequest, tree.TreeService_GetOpLogServer) error {
panic("implement me")
}
func (m *mockTreeServer) Healthcheck(context.Context, *tree.HealthcheckRequest) (*tree.HealthcheckResponse, error) {
if m.healthy {
return new(tree.HealthcheckResponse), nil
}
return nil, errors.New("not healthy")
}
func createTestServer(t *testing.T, id int) *mockTreeServer {
lis, err := net.Listen("tcp", "127.0.0.1:0")
require.NoError(t, err)
key, err := keys.NewPrivateKey()
require.NoError(t, err)
res := &mockTreeServer{
id: id,
srv: grpc.NewServer(),
lis: lis,
key: key,
healthy: true,
}
tree.RegisterTreeServiceServer(res.srv, res)
return res
}
func preparePoolWithNetmapSource(t *testing.T, n int, p string) (*Pool, []*mockTreeServer, *mockNetmapSource) {
poolInitParams := InitParameters{}
servers := make([]*mockTreeServer, n)
for i := range servers {
servers[i] = createTestServer(t, i)
servers[i].healthy = true
servers[i].Serve()
poolInitParams.AddNode(pool.NewNodeParam(1, servers[i].Addr(), 1))
}
source := &mockNetmapSource{
servers: servers,
policy: p,
}
key, err := keys.NewPrivateKey()
require.NoError(t, err)
poolInitParams.SetKey(key)
poolInitParams.SetNetMapInfoSource(source)
cli, err := NewPool(poolInitParams)
require.NoError(t, err)
return cli, servers, source
}
func sortServers(ctx context.Context, servers []*mockTreeServer, source *mockNetmapSource, cnr cid.ID) ([]*mockTreeServer, error) {
res := make([]*mockTreeServer, len(servers))
snapshot, err := source.NetMapSnapshot(ctx)
if err != nil {
return nil, err
}
policy, err := source.PlacementPolicy(ctx, cnr)
if err != nil {
return nil, err
}
cnrNodes, err := snapshot.ContainerNodes(policy, cnr[:])
if err != nil {
return nil, err
}
priorityNodes, err := snapshot.PlacementVectors(cnrNodes, cnr[:])
if err != nil {
return nil, err
}
// find servers based on public key and store pointers in res
index := 0
for i := range priorityNodes {
for j := range priorityNodes[i] {
key := priorityNodes[i][j].PublicKey()
for k := range servers {
if bytes.Equal(servers[k].key.PublicKey().Bytes(), key) {
res[index] = servers[k]
index++
break
}
}
}
}
return res, nil
}
func TestConnectionLeak(t *testing.T) {
const (
numberOfNodes = 4
placementPolicy = "REP 2"
)
// Initialize gRPC servers and create pool with netmap source
treePool, servers, source := preparePoolWithNetmapSource(t, numberOfNodes, placementPolicy)
for i := range servers {
defer servers[i].Stop()
}
cnr := cidtest.ID()
ctx := context.Background()
// Make priority node for cnr unhealthy, so it is going to be redialled on every request
sortedServers, err := sortServers(ctx, servers, source, cnr)
require.NoError(t, err)
sortedServers[0].healthy = false
// Make RPC and check that pool switched to healthy server
_, err = treePool.AddNode(context.Background(), AddNodeParams{CID: cnr})
require.NoError(t, err)
require.Equal(t, 0, sortedServers[0].addCounter) // unhealthy
require.Equal(t, 1, sortedServers[1].addCounter) // healthy
// Check that go routines are not leaked during multiple requests
routinesBefore := runtime.NumGoroutine()
for i := 0; i < 1000; i++ {
_, err = treePool.AddNode(context.Background(), AddNodeParams{CID: cnr})
require.NoError(t, err)
}
// not more than 1 extra goroutine is created due to async operations
require.LessOrEqual(t, runtime.NumGoroutine()-routinesBefore, 1)
}
func TestStreamRetry(t *testing.T) {
const (
numberOfNodes = 4
placementPolicy = "REP 2"
)
expected := []*tree.GetSubTreeResponse_Body{
{
NodeId: []uint64{1},
},
{
NodeId: []uint64{2},
},
{
NodeId: []uint64{3},
},
}
// Initialize gRPC servers and create pool with netmap source
treePool, servers, source := preparePoolWithNetmapSource(t, numberOfNodes, placementPolicy)
defer func() {
for i := range servers {
servers[i].Stop()
}
}()
cnr := cidtest.ID()
ctx := context.Background()
sortedServers, err := sortServers(ctx, servers, source, cnr)
require.NoError(t, err)
// Return expected response in last priority node, others return error
for i := range sortedServers {
if i == len(sortedServers)-1 {
sortedServers[i].getSubTreeResponses = expected
} else {
sortedServers[i].getSubTreeError = errors.New("tree not found")
}
}
t.Run("read all", func(t *testing.T) {
reader, err := treePool.GetSubTree(ctx, GetSubTreeParams{CID: cnr})
require.NoError(t, err)
data, err := reader.ReadAll()
require.NoError(t, err)
require.Len(t, data, len(expected))
for i := range expected {
require.EqualValues(t, expected[i].GetNodeId(), data[i].GetNodeID())
}
})
t.Run("next", func(t *testing.T) {
reader, err := treePool.GetSubTree(ctx, GetSubTreeParams{CID: cnr})
require.NoError(t, err)
for i := range expected {
resp, err := reader.Next()
require.NoError(t, err)
require.Equal(t, expected[i].GetNodeId(), resp.GetNodeID())
}
_, err = reader.Next()
require.Error(t, io.EOF, err)
})
t.Run("read", func(t *testing.T) {
reader, err := treePool.GetSubTree(ctx, GetSubTreeParams{CID: cnr})
require.NoError(t, err)
buf := make([]*apitree.GetSubTreeResponseBody, len(expected))
_, err = reader.Read(buf)
require.NoError(t, err)
require.Len(t, buf, len(expected))
for i := range expected {
require.EqualValues(t, expected[i].GetNodeId(), buf[i].GetNodeID())
}
})
for i := range servers {
// check we retried every available node in the pool three times
require.Equal(t, 3, servers[i].getSubTreeCounter)
}
}

View file

@ -10,7 +10,6 @@ import (
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
netmaptest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap/test"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool"
"git.frostfs.info/TrueCloudLab/hrw"
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
@ -417,21 +416,6 @@ func TestRetryContainerNodes(t *testing.T) {
})
}
func TestDeleteClientTwice(t *testing.T) {
p := Pool{
clientMap: makeClientMap([]netmap.NodeInfo{netmaptest.NodeInfo()}),
}
// emulate concurrent requests as consecutive requests
// to delete the same client from the map twice
for idToDelete := range p.clientMap {
p.deleteClientFromMap(idToDelete)
require.NotPanics(t, func() {
p.deleteClientFromMap(idToDelete)
})
}
require.Empty(t, p.clientMap)
}
func makeInnerPool(nodes [][]string) []*innerPool {
res := make([]*innerPool, len(nodes))

View file

@ -4,7 +4,6 @@ import (
"bytes"
"errors"
"fmt"
"strings"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/refs"
"github.com/mr-tron/base58"
@ -13,8 +12,9 @@ import (
"github.com/nspcc-dev/neo-go/pkg/util"
)
// idFullSize is the size of ID in bytes, including prefix and checksum.
const idFullSize = util.Uint160Size + 5
const idSize = 25
var zeroSlice = bytes.Repeat([]byte{0}, idSize)
// ID identifies users of the FrostFS system.
//
@ -25,7 +25,7 @@ const idFullSize = util.Uint160Size + 5
// so it MUST be initialized using some modifying function (e.g. SetScriptHash,
// IDFromKey, etc.).
type ID struct {
w util.Uint160
w []byte
}
// ReadFromV2 reads ID from the refs.OwnerID message. Returns an error if
@ -33,7 +33,22 @@ type ID struct {
//
// See also WriteToV2.
func (x *ID) ReadFromV2(m refs.OwnerID) error {
return x.setUserID(m.GetValue())
w := m.GetValue()
if len(w) != idSize {
return fmt.Errorf("invalid length %d, expected %d", len(w), idSize)
}
if w[0] != address.NEO3Prefix {
return fmt.Errorf("invalid prefix byte 0x%X, expected 0x%X", w[0], address.NEO3Prefix)
}
if !bytes.Equal(w[21:], hash.Checksum(w[:21])) {
return errors.New("checksum mismatch")
}
x.w = w
return nil
}
// WriteToV2 writes ID to the refs.OwnerID message.
@ -41,17 +56,25 @@ func (x *ID) ReadFromV2(m refs.OwnerID) error {
//
// See also ReadFromV2.
func (x ID) WriteToV2(m *refs.OwnerID) {
m.SetValue(x.WalletBytes())
m.SetValue(x.w)
}
// SetScriptHash forms user ID from wallet address scripthash.
func (x *ID) SetScriptHash(scriptHash util.Uint160) {
x.w = scriptHash
if cap(x.w) < idSize {
x.w = make([]byte, idSize)
} else if len(x.w) < idSize {
x.w = x.w[:idSize]
}
x.w[0] = address.Prefix
copy(x.w[1:], scriptHash.BytesBE())
copy(x.w[21:], hash.Checksum(x.w[:21]))
}
// ScriptHash calculates and returns script hash of ID.
func (x *ID) ScriptHash() util.Uint160 {
return x.w
func (x *ID) ScriptHash() (util.Uint160, error) {
return util.Uint160DecodeBytesBE(x.w[1:21])
}
// WalletBytes returns FrostFS user ID as Neo3 wallet address in a binary format.
@ -60,18 +83,14 @@ func (x *ID) ScriptHash() util.Uint160 {
//
// See also Neo3 wallet docs.
func (x ID) WalletBytes() []byte {
v := make([]byte, idFullSize)
v[0] = address.Prefix
copy(v[1:], x.w[:])
copy(v[21:], hash.Checksum(v[:21]))
return v
return x.w
}
// EncodeToString encodes ID into FrostFS API V2 protocol string.
//
// See also DecodeString.
func (x ID) EncodeToString() string {
return base58.Encode(x.WalletBytes())
return base58.Encode(x.w)
}
// DecodeString decodes FrostFS API V2 protocol string. Returns an error
@ -81,11 +100,14 @@ func (x ID) EncodeToString() string {
//
// See also EncodeToString.
func (x *ID) DecodeString(s string) error {
w, err := base58.Decode(s)
var err error
x.w, err = base58.Decode(s)
if err != nil {
return fmt.Errorf("decode base58: %w", err)
}
return x.setUserID(w)
return nil
}
// String implements fmt.Stringer.
@ -99,34 +121,10 @@ func (x ID) String() string {
// Equals defines a comparison relation between two ID instances.
func (x ID) Equals(x2 ID) bool {
return x.w == x2.w
return bytes.Equal(x.w, x2.w)
}
// IsEmpty returns True, if ID is empty value.
func (x ID) IsEmpty() bool {
return x.w == util.Uint160{}
}
func (x *ID) setUserID(w []byte) error {
if len(w) != idFullSize {
return fmt.Errorf("invalid length %d, expected %d", len(w), idFullSize)
}
if w[0] != address.NEO3Prefix {
return fmt.Errorf("invalid prefix byte 0x%X, expected 0x%X", w[0], address.NEO3Prefix)
}
if !bytes.Equal(w[21:], hash.Checksum(w[:21])) {
return errors.New("checksum mismatch")
}
copy(x.w[:], w[1:21])
return nil
}
// Cmp returns an integer comparing two base58 encoded user ID lexicographically.
// The result will be 0 if id1 == id2, -1 if id1 < id2, and +1 if id1 > id2.
func (x ID) Cmp(x2 ID) int {
return strings.Compare(x.EncodeToString(), x2.EncodeToString())
return bytes.Equal(zeroSlice, x.w)
}

View file

@ -3,8 +3,6 @@ package user_test
import (
"bytes"
"crypto/rand"
"slices"
"strings"
"testing"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/refs"
@ -53,7 +51,8 @@ func TestID_SetScriptHash(t *testing.T) {
func TestID_ScriptHash(t *testing.T) {
userID := usertest.ID()
scriptHash := userID.ScriptHash()
scriptHash, err := userID.ScriptHash()
require.NoError(t, err)
ownerAddress := userID.EncodeToString()
decodedScriptHash, err := address.StringToUint160(ownerAddress)
@ -134,16 +133,3 @@ func TestID_Equal(t *testing.T) {
require.True(t, id3.Equals(id1)) // commutativity
require.False(t, id1.Equals(id2))
}
func TestID_Cmp(t *testing.T) {
id1 := usertest.ID()
id2 := usertest.ID()
id3 := usertest.ID()
arr := []ID{id1, id2, id3}
slices.SortFunc(arr, ID.Cmp)
for i := 1; i < len(arr); i++ {
require.NotEqual(t, strings.Compare(arr[i-1].EncodeToString(), arr[i].EncodeToString()), 1, "array is not sorted correctly")
}
}