Compare commits
No commits in common. "master" and "master" have entirely different histories.
28 changed files with 1811 additions and 2699 deletions
|
@ -12,20 +12,18 @@ import (
|
|||
// SendUnary initializes communication session by RPC info, performs unary RPC
|
||||
// and closes the session.
|
||||
func SendUnary(cli *Client, info common.CallMethodInfo, req, resp message.Message, opts ...CallOption) error {
|
||||
rw, err := cli.initInternal(info, opts...)
|
||||
rw, err := cli.Init(info, opts...)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = rw.WriteMessage(req)
|
||||
if err != nil {
|
||||
rw.cancel()
|
||||
return err
|
||||
}
|
||||
|
||||
err = rw.ReadMessage(resp)
|
||||
if err != nil {
|
||||
rw.cancel()
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -40,28 +38,18 @@ type MessageWriterCloser interface {
|
|||
}
|
||||
|
||||
type clientStreamWriterCloser struct {
|
||||
sw *streamWrapper
|
||||
MessageReadWriter
|
||||
|
||||
resp message.Message
|
||||
}
|
||||
|
||||
// WriteMessage implements MessageWriterCloser.
|
||||
func (c *clientStreamWriterCloser) WriteMessage(m message.Message) error {
|
||||
return c.sw.WriteMessage(m)
|
||||
}
|
||||
|
||||
func (c *clientStreamWriterCloser) Close() error {
|
||||
err := c.sw.closeSend()
|
||||
err := c.MessageReadWriter.Close()
|
||||
if err != nil {
|
||||
c.sw.cancel()
|
||||
return err
|
||||
}
|
||||
|
||||
if err = c.sw.ReadMessage(c.resp); err != nil {
|
||||
c.sw.cancel()
|
||||
return err
|
||||
}
|
||||
|
||||
return c.sw.Close()
|
||||
return c.ReadMessage(c.resp)
|
||||
}
|
||||
|
||||
// OpenClientStream initializes communication session by RPC info, opens client-side stream
|
||||
|
@ -69,14 +57,14 @@ func (c *clientStreamWriterCloser) Close() error {
|
|||
//
|
||||
// All stream writes must be performed before the closing. Close must be called once.
|
||||
func OpenClientStream(cli *Client, info common.CallMethodInfo, resp message.Message, opts ...CallOption) (MessageWriterCloser, error) {
|
||||
rw, err := cli.initInternal(info, opts...)
|
||||
rw, err := cli.Init(info, opts...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &clientStreamWriterCloser{
|
||||
sw: rw,
|
||||
resp: resp,
|
||||
MessageReadWriter: rw,
|
||||
resp: resp,
|
||||
}, nil
|
||||
}
|
||||
|
||||
|
@ -88,7 +76,7 @@ type MessageReaderCloser interface {
|
|||
}
|
||||
|
||||
type serverStreamReaderCloser struct {
|
||||
rw *streamWrapper
|
||||
rw MessageReadWriter
|
||||
|
||||
once sync.Once
|
||||
|
||||
|
@ -103,15 +91,11 @@ func (s *serverStreamReaderCloser) ReadMessage(msg message.Message) error {
|
|||
})
|
||||
|
||||
if err != nil {
|
||||
s.rw.cancel()
|
||||
return err
|
||||
}
|
||||
|
||||
err = s.rw.ReadMessage(msg)
|
||||
if !errors.Is(err, io.EOF) {
|
||||
if err != nil {
|
||||
s.rw.cancel()
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -128,7 +112,7 @@ func (s *serverStreamReaderCloser) ReadMessage(msg message.Message) error {
|
|||
//
|
||||
// All stream reads must be performed before the closing. Close must be called once.
|
||||
func OpenServerStream(cli *Client, info common.CallMethodInfo, req message.Message, opts ...CallOption) (MessageReader, error) {
|
||||
rw, err := cli.initInternal(info, opts...)
|
||||
rw, err := cli.Init(info, opts...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
|
@ -41,10 +41,6 @@ type MessageReadWriter interface {
|
|||
|
||||
// Init initiates a messaging session and returns the interface for message transmitting.
|
||||
func (c *Client) Init(info common.CallMethodInfo, opts ...CallOption) (MessageReadWriter, error) {
|
||||
return c.initInternal(info, opts...)
|
||||
}
|
||||
|
||||
func (c *Client) initInternal(info common.CallMethodInfo, opts ...CallOption) (*streamWrapper, error) {
|
||||
prm := defaultCallParameters()
|
||||
|
||||
for _, opt := range opts {
|
||||
|
@ -65,13 +61,7 @@ func (c *Client) initInternal(info common.CallMethodInfo, opts ...CallOption) (*
|
|||
// would propagate to all subsequent read/write operations on the opened stream,
|
||||
// which is not desired for the stream's lifecycle management.
|
||||
dialTimeoutTimer := time.NewTimer(c.dialTimeout)
|
||||
defer func() {
|
||||
dialTimeoutTimer.Stop()
|
||||
select {
|
||||
case <-dialTimeoutTimer.C:
|
||||
default:
|
||||
}
|
||||
}()
|
||||
defer dialTimeoutTimer.Stop()
|
||||
|
||||
type newStreamRes struct {
|
||||
stream grpc.ClientStream
|
||||
|
@ -101,7 +91,7 @@ func (c *Client) initInternal(info common.CallMethodInfo, opts ...CallOption) (*
|
|||
if res.stream != nil && res.err == nil {
|
||||
_ = res.stream.CloseSend()
|
||||
}
|
||||
return nil, context.DeadlineExceeded
|
||||
return nil, context.Canceled
|
||||
case res = <-newStreamCh:
|
||||
}
|
||||
|
||||
|
|
|
@ -38,7 +38,6 @@ func (c *cfg) initDefault() {
|
|||
c.rwTimeout = defaultRWTimeout
|
||||
c.grpcDialOpts = []grpc.DialOption{
|
||||
grpc.WithTransportCredentials(insecure.NewCredentials()),
|
||||
grpc.WithDisableServiceConfig(),
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -34,13 +34,8 @@ func (w streamWrapper) WriteMessage(m message.Message) error {
|
|||
})
|
||||
}
|
||||
|
||||
func (w *streamWrapper) closeSend() error {
|
||||
return w.withTimeout(w.ClientStream.CloseSend)
|
||||
}
|
||||
|
||||
func (w *streamWrapper) Close() error {
|
||||
w.cancel()
|
||||
return nil
|
||||
return w.withTimeout(w.ClientStream.CloseSend)
|
||||
}
|
||||
|
||||
func (w *streamWrapper) withTimeout(closure func() error) error {
|
||||
|
@ -55,10 +50,6 @@ func (w *streamWrapper) withTimeout(closure func() error) error {
|
|||
select {
|
||||
case err := <-ch:
|
||||
tt.Stop()
|
||||
select {
|
||||
case <-tt.C:
|
||||
default:
|
||||
}
|
||||
return err
|
||||
case <-tt.C:
|
||||
w.cancel()
|
||||
|
|
64
api/status/grpc/types.pb.go
generated
64
api/status/grpc/types.pb.go
generated
|
@ -144,9 +144,6 @@ const (
|
|||
// request parameter as the client sent it incorrectly, then this code should
|
||||
// be used.
|
||||
CommonFail_INVALID_ARGUMENT CommonFail = 4
|
||||
// [**1029**] Resource exhausted failure. If the operation cannot be performed
|
||||
// due to a lack of resources.
|
||||
CommonFail_RESOURCE_EXHAUSTED CommonFail = 5
|
||||
)
|
||||
|
||||
// Enum value maps for CommonFail.
|
||||
|
@ -157,7 +154,6 @@ var (
|
|||
2: "SIGNATURE_VERIFICATION_FAIL",
|
||||
3: "NODE_UNDER_MAINTENANCE",
|
||||
4: "INVALID_ARGUMENT",
|
||||
5: "RESOURCE_EXHAUSTED",
|
||||
}
|
||||
CommonFail_value = map[string]int32{
|
||||
"INTERNAL": 0,
|
||||
|
@ -165,7 +161,6 @@ var (
|
|||
"SIGNATURE_VERIFICATION_FAIL": 2,
|
||||
"NODE_UNDER_MAINTENANCE": 3,
|
||||
"INVALID_ARGUMENT": 4,
|
||||
"RESOURCE_EXHAUSTED": 5,
|
||||
}
|
||||
)
|
||||
|
||||
|
@ -659,7 +654,7 @@ var file_api_status_grpc_types_proto_rawDesc = []byte{
|
|||
0x45, 0x43, 0x54, 0x49, 0x4f, 0x4e, 0x5f, 0x53, 0x45, 0x53, 0x53, 0x49, 0x4f, 0x4e, 0x10, 0x04,
|
||||
0x12, 0x17, 0x0a, 0x13, 0x53, 0x45, 0x43, 0x54, 0x49, 0x4f, 0x4e, 0x5f, 0x41, 0x50, 0x45, 0x5f,
|
||||
0x4d, 0x41, 0x4e, 0x41, 0x47, 0x45, 0x52, 0x10, 0x05, 0x2a, 0x11, 0x0a, 0x07, 0x53, 0x75, 0x63,
|
||||
0x63, 0x65, 0x73, 0x73, 0x12, 0x06, 0x0a, 0x02, 0x4f, 0x4b, 0x10, 0x00, 0x2a, 0x9d, 0x01, 0x0a,
|
||||
0x63, 0x65, 0x73, 0x73, 0x12, 0x06, 0x0a, 0x02, 0x4f, 0x4b, 0x10, 0x00, 0x2a, 0x85, 0x01, 0x0a,
|
||||
0x0a, 0x43, 0x6f, 0x6d, 0x6d, 0x6f, 0x6e, 0x46, 0x61, 0x69, 0x6c, 0x12, 0x0c, 0x0a, 0x08, 0x49,
|
||||
0x4e, 0x54, 0x45, 0x52, 0x4e, 0x41, 0x4c, 0x10, 0x00, 0x12, 0x16, 0x0a, 0x12, 0x57, 0x52, 0x4f,
|
||||
0x4e, 0x47, 0x5f, 0x4d, 0x41, 0x47, 0x49, 0x43, 0x5f, 0x4e, 0x55, 0x4d, 0x42, 0x45, 0x52, 0x10,
|
||||
|
@ -668,35 +663,34 @@ var file_api_status_grpc_types_proto_rawDesc = []byte{
|
|||
0x10, 0x02, 0x12, 0x1a, 0x0a, 0x16, 0x4e, 0x4f, 0x44, 0x45, 0x5f, 0x55, 0x4e, 0x44, 0x45, 0x52,
|
||||
0x5f, 0x4d, 0x41, 0x49, 0x4e, 0x54, 0x45, 0x4e, 0x41, 0x4e, 0x43, 0x45, 0x10, 0x03, 0x12, 0x14,
|
||||
0x0a, 0x10, 0x49, 0x4e, 0x56, 0x41, 0x4c, 0x49, 0x44, 0x5f, 0x41, 0x52, 0x47, 0x55, 0x4d, 0x45,
|
||||
0x4e, 0x54, 0x10, 0x04, 0x12, 0x16, 0x0a, 0x12, 0x52, 0x45, 0x53, 0x4f, 0x55, 0x52, 0x43, 0x45,
|
||||
0x5f, 0x45, 0x58, 0x48, 0x41, 0x55, 0x53, 0x54, 0x45, 0x44, 0x10, 0x05, 0x2a, 0x88, 0x01, 0x0a,
|
||||
0x06, 0x4f, 0x62, 0x6a, 0x65, 0x63, 0x74, 0x12, 0x11, 0x0a, 0x0d, 0x41, 0x43, 0x43, 0x45, 0x53,
|
||||
0x53, 0x5f, 0x44, 0x45, 0x4e, 0x49, 0x45, 0x44, 0x10, 0x00, 0x12, 0x14, 0x0a, 0x10, 0x4f, 0x42,
|
||||
0x4a, 0x45, 0x43, 0x54, 0x5f, 0x4e, 0x4f, 0x54, 0x5f, 0x46, 0x4f, 0x55, 0x4e, 0x44, 0x10, 0x01,
|
||||
0x12, 0x0a, 0x0a, 0x06, 0x4c, 0x4f, 0x43, 0x4b, 0x45, 0x44, 0x10, 0x02, 0x12, 0x1b, 0x0a, 0x17,
|
||||
0x4c, 0x4f, 0x43, 0x4b, 0x5f, 0x4e, 0x4f, 0x4e, 0x5f, 0x52, 0x45, 0x47, 0x55, 0x4c, 0x41, 0x52,
|
||||
0x5f, 0x4f, 0x42, 0x4a, 0x45, 0x43, 0x54, 0x10, 0x03, 0x12, 0x1a, 0x0a, 0x16, 0x4f, 0x42, 0x4a,
|
||||
0x45, 0x43, 0x54, 0x5f, 0x41, 0x4c, 0x52, 0x45, 0x41, 0x44, 0x59, 0x5f, 0x52, 0x45, 0x4d, 0x4f,
|
||||
0x56, 0x45, 0x44, 0x10, 0x04, 0x12, 0x10, 0x0a, 0x0c, 0x4f, 0x55, 0x54, 0x5f, 0x4f, 0x46, 0x5f,
|
||||
0x52, 0x41, 0x4e, 0x47, 0x45, 0x10, 0x05, 0x2a, 0x55, 0x0a, 0x09, 0x43, 0x6f, 0x6e, 0x74, 0x61,
|
||||
0x69, 0x6e, 0x65, 0x72, 0x12, 0x17, 0x0a, 0x13, 0x43, 0x4f, 0x4e, 0x54, 0x41, 0x49, 0x4e, 0x45,
|
||||
0x52, 0x5f, 0x4e, 0x4f, 0x54, 0x5f, 0x46, 0x4f, 0x55, 0x4e, 0x44, 0x10, 0x00, 0x12, 0x12, 0x0a,
|
||||
0x0e, 0x45, 0x41, 0x43, 0x4c, 0x5f, 0x4e, 0x4f, 0x54, 0x5f, 0x46, 0x4f, 0x55, 0x4e, 0x44, 0x10,
|
||||
0x01, 0x12, 0x1b, 0x0a, 0x17, 0x43, 0x4f, 0x4e, 0x54, 0x41, 0x49, 0x4e, 0x45, 0x52, 0x5f, 0x41,
|
||||
0x43, 0x43, 0x45, 0x53, 0x53, 0x5f, 0x44, 0x45, 0x4e, 0x49, 0x45, 0x44, 0x10, 0x02, 0x2a, 0x31,
|
||||
0x0a, 0x07, 0x53, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x12, 0x13, 0x0a, 0x0f, 0x54, 0x4f, 0x4b,
|
||||
0x45, 0x4e, 0x5f, 0x4e, 0x4f, 0x54, 0x5f, 0x46, 0x4f, 0x55, 0x4e, 0x44, 0x10, 0x00, 0x12, 0x11,
|
||||
0x0a, 0x0d, 0x54, 0x4f, 0x4b, 0x45, 0x4e, 0x5f, 0x45, 0x58, 0x50, 0x49, 0x52, 0x45, 0x44, 0x10,
|
||||
0x01, 0x2a, 0x2b, 0x0a, 0x0a, 0x41, 0x50, 0x45, 0x4d, 0x61, 0x6e, 0x61, 0x67, 0x65, 0x72, 0x12,
|
||||
0x1d, 0x0a, 0x19, 0x41, 0x50, 0x45, 0x5f, 0x4d, 0x41, 0x4e, 0x41, 0x47, 0x45, 0x52, 0x5f, 0x41,
|
||||
0x43, 0x43, 0x45, 0x53, 0x53, 0x5f, 0x44, 0x45, 0x4e, 0x49, 0x45, 0x44, 0x10, 0x00, 0x42, 0x62,
|
||||
0x5a, 0x43, 0x67, 0x69, 0x74, 0x2e, 0x66, 0x72, 0x6f, 0x73, 0x74, 0x66, 0x73, 0x2e, 0x69, 0x6e,
|
||||
0x66, 0x6f, 0x2f, 0x54, 0x72, 0x75, 0x65, 0x43, 0x6c, 0x6f, 0x75, 0x64, 0x4c, 0x61, 0x62, 0x2f,
|
||||
0x66, 0x72, 0x6f, 0x73, 0x74, 0x66, 0x73, 0x2d, 0x73, 0x64, 0x6b, 0x2d, 0x67, 0x6f, 0x2f, 0x61,
|
||||
0x70, 0x69, 0x2f, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x2f, 0x67, 0x72, 0x70, 0x63, 0x3b, 0x73,
|
||||
0x74, 0x61, 0x74, 0x75, 0x73, 0xaa, 0x02, 0x1a, 0x4e, 0x65, 0x6f, 0x2e, 0x46, 0x69, 0x6c, 0x65,
|
||||
0x53, 0x74, 0x6f, 0x72, 0x61, 0x67, 0x65, 0x2e, 0x41, 0x50, 0x49, 0x2e, 0x53, 0x74, 0x61, 0x74,
|
||||
0x75, 0x73, 0x62, 0x08, 0x65, 0x64, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x70, 0xe8, 0x07,
|
||||
0x4e, 0x54, 0x10, 0x04, 0x2a, 0x88, 0x01, 0x0a, 0x06, 0x4f, 0x62, 0x6a, 0x65, 0x63, 0x74, 0x12,
|
||||
0x11, 0x0a, 0x0d, 0x41, 0x43, 0x43, 0x45, 0x53, 0x53, 0x5f, 0x44, 0x45, 0x4e, 0x49, 0x45, 0x44,
|
||||
0x10, 0x00, 0x12, 0x14, 0x0a, 0x10, 0x4f, 0x42, 0x4a, 0x45, 0x43, 0x54, 0x5f, 0x4e, 0x4f, 0x54,
|
||||
0x5f, 0x46, 0x4f, 0x55, 0x4e, 0x44, 0x10, 0x01, 0x12, 0x0a, 0x0a, 0x06, 0x4c, 0x4f, 0x43, 0x4b,
|
||||
0x45, 0x44, 0x10, 0x02, 0x12, 0x1b, 0x0a, 0x17, 0x4c, 0x4f, 0x43, 0x4b, 0x5f, 0x4e, 0x4f, 0x4e,
|
||||
0x5f, 0x52, 0x45, 0x47, 0x55, 0x4c, 0x41, 0x52, 0x5f, 0x4f, 0x42, 0x4a, 0x45, 0x43, 0x54, 0x10,
|
||||
0x03, 0x12, 0x1a, 0x0a, 0x16, 0x4f, 0x42, 0x4a, 0x45, 0x43, 0x54, 0x5f, 0x41, 0x4c, 0x52, 0x45,
|
||||
0x41, 0x44, 0x59, 0x5f, 0x52, 0x45, 0x4d, 0x4f, 0x56, 0x45, 0x44, 0x10, 0x04, 0x12, 0x10, 0x0a,
|
||||
0x0c, 0x4f, 0x55, 0x54, 0x5f, 0x4f, 0x46, 0x5f, 0x52, 0x41, 0x4e, 0x47, 0x45, 0x10, 0x05, 0x2a,
|
||||
0x55, 0x0a, 0x09, 0x43, 0x6f, 0x6e, 0x74, 0x61, 0x69, 0x6e, 0x65, 0x72, 0x12, 0x17, 0x0a, 0x13,
|
||||
0x43, 0x4f, 0x4e, 0x54, 0x41, 0x49, 0x4e, 0x45, 0x52, 0x5f, 0x4e, 0x4f, 0x54, 0x5f, 0x46, 0x4f,
|
||||
0x55, 0x4e, 0x44, 0x10, 0x00, 0x12, 0x12, 0x0a, 0x0e, 0x45, 0x41, 0x43, 0x4c, 0x5f, 0x4e, 0x4f,
|
||||
0x54, 0x5f, 0x46, 0x4f, 0x55, 0x4e, 0x44, 0x10, 0x01, 0x12, 0x1b, 0x0a, 0x17, 0x43, 0x4f, 0x4e,
|
||||
0x54, 0x41, 0x49, 0x4e, 0x45, 0x52, 0x5f, 0x41, 0x43, 0x43, 0x45, 0x53, 0x53, 0x5f, 0x44, 0x45,
|
||||
0x4e, 0x49, 0x45, 0x44, 0x10, 0x02, 0x2a, 0x31, 0x0a, 0x07, 0x53, 0x65, 0x73, 0x73, 0x69, 0x6f,
|
||||
0x6e, 0x12, 0x13, 0x0a, 0x0f, 0x54, 0x4f, 0x4b, 0x45, 0x4e, 0x5f, 0x4e, 0x4f, 0x54, 0x5f, 0x46,
|
||||
0x4f, 0x55, 0x4e, 0x44, 0x10, 0x00, 0x12, 0x11, 0x0a, 0x0d, 0x54, 0x4f, 0x4b, 0x45, 0x4e, 0x5f,
|
||||
0x45, 0x58, 0x50, 0x49, 0x52, 0x45, 0x44, 0x10, 0x01, 0x2a, 0x2b, 0x0a, 0x0a, 0x41, 0x50, 0x45,
|
||||
0x4d, 0x61, 0x6e, 0x61, 0x67, 0x65, 0x72, 0x12, 0x1d, 0x0a, 0x19, 0x41, 0x50, 0x45, 0x5f, 0x4d,
|
||||
0x41, 0x4e, 0x41, 0x47, 0x45, 0x52, 0x5f, 0x41, 0x43, 0x43, 0x45, 0x53, 0x53, 0x5f, 0x44, 0x45,
|
||||
0x4e, 0x49, 0x45, 0x44, 0x10, 0x00, 0x42, 0x62, 0x5a, 0x43, 0x67, 0x69, 0x74, 0x2e, 0x66, 0x72,
|
||||
0x6f, 0x73, 0x74, 0x66, 0x73, 0x2e, 0x69, 0x6e, 0x66, 0x6f, 0x2f, 0x54, 0x72, 0x75, 0x65, 0x43,
|
||||
0x6c, 0x6f, 0x75, 0x64, 0x4c, 0x61, 0x62, 0x2f, 0x66, 0x72, 0x6f, 0x73, 0x74, 0x66, 0x73, 0x2d,
|
||||
0x73, 0x64, 0x6b, 0x2d, 0x67, 0x6f, 0x2f, 0x61, 0x70, 0x69, 0x2f, 0x73, 0x74, 0x61, 0x74, 0x75,
|
||||
0x73, 0x2f, 0x67, 0x72, 0x70, 0x63, 0x3b, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0xaa, 0x02, 0x1a,
|
||||
0x4e, 0x65, 0x6f, 0x2e, 0x46, 0x69, 0x6c, 0x65, 0x53, 0x74, 0x6f, 0x72, 0x61, 0x67, 0x65, 0x2e,
|
||||
0x41, 0x50, 0x49, 0x2e, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x62, 0x08, 0x65, 0x64, 0x69, 0x74,
|
||||
0x69, 0x6f, 0x6e, 0x73, 0x70, 0xe8, 0x07,
|
||||
}
|
||||
|
||||
var file_api_status_grpc_types_proto_enumTypes = make([]protoimpl.EnumInfo, 7)
|
||||
|
|
64
api/status/grpc/types_protoopaque.pb.go
generated
64
api/status/grpc/types_protoopaque.pb.go
generated
|
@ -144,9 +144,6 @@ const (
|
|||
// request parameter as the client sent it incorrectly, then this code should
|
||||
// be used.
|
||||
CommonFail_INVALID_ARGUMENT CommonFail = 4
|
||||
// [**1029**] Resource exhausted failure. If the operation cannot be performed
|
||||
// due to a lack of resources.
|
||||
CommonFail_RESOURCE_EXHAUSTED CommonFail = 5
|
||||
)
|
||||
|
||||
// Enum value maps for CommonFail.
|
||||
|
@ -157,7 +154,6 @@ var (
|
|||
2: "SIGNATURE_VERIFICATION_FAIL",
|
||||
3: "NODE_UNDER_MAINTENANCE",
|
||||
4: "INVALID_ARGUMENT",
|
||||
5: "RESOURCE_EXHAUSTED",
|
||||
}
|
||||
CommonFail_value = map[string]int32{
|
||||
"INTERNAL": 0,
|
||||
|
@ -165,7 +161,6 @@ var (
|
|||
"SIGNATURE_VERIFICATION_FAIL": 2,
|
||||
"NODE_UNDER_MAINTENANCE": 3,
|
||||
"INVALID_ARGUMENT": 4,
|
||||
"RESOURCE_EXHAUSTED": 5,
|
||||
}
|
||||
)
|
||||
|
||||
|
@ -681,7 +676,7 @@ var file_api_status_grpc_types_proto_rawDesc = []byte{
|
|||
0x45, 0x43, 0x54, 0x49, 0x4f, 0x4e, 0x5f, 0x53, 0x45, 0x53, 0x53, 0x49, 0x4f, 0x4e, 0x10, 0x04,
|
||||
0x12, 0x17, 0x0a, 0x13, 0x53, 0x45, 0x43, 0x54, 0x49, 0x4f, 0x4e, 0x5f, 0x41, 0x50, 0x45, 0x5f,
|
||||
0x4d, 0x41, 0x4e, 0x41, 0x47, 0x45, 0x52, 0x10, 0x05, 0x2a, 0x11, 0x0a, 0x07, 0x53, 0x75, 0x63,
|
||||
0x63, 0x65, 0x73, 0x73, 0x12, 0x06, 0x0a, 0x02, 0x4f, 0x4b, 0x10, 0x00, 0x2a, 0x9d, 0x01, 0x0a,
|
||||
0x63, 0x65, 0x73, 0x73, 0x12, 0x06, 0x0a, 0x02, 0x4f, 0x4b, 0x10, 0x00, 0x2a, 0x85, 0x01, 0x0a,
|
||||
0x0a, 0x43, 0x6f, 0x6d, 0x6d, 0x6f, 0x6e, 0x46, 0x61, 0x69, 0x6c, 0x12, 0x0c, 0x0a, 0x08, 0x49,
|
||||
0x4e, 0x54, 0x45, 0x52, 0x4e, 0x41, 0x4c, 0x10, 0x00, 0x12, 0x16, 0x0a, 0x12, 0x57, 0x52, 0x4f,
|
||||
0x4e, 0x47, 0x5f, 0x4d, 0x41, 0x47, 0x49, 0x43, 0x5f, 0x4e, 0x55, 0x4d, 0x42, 0x45, 0x52, 0x10,
|
||||
|
@ -690,35 +685,34 @@ var file_api_status_grpc_types_proto_rawDesc = []byte{
|
|||
0x10, 0x02, 0x12, 0x1a, 0x0a, 0x16, 0x4e, 0x4f, 0x44, 0x45, 0x5f, 0x55, 0x4e, 0x44, 0x45, 0x52,
|
||||
0x5f, 0x4d, 0x41, 0x49, 0x4e, 0x54, 0x45, 0x4e, 0x41, 0x4e, 0x43, 0x45, 0x10, 0x03, 0x12, 0x14,
|
||||
0x0a, 0x10, 0x49, 0x4e, 0x56, 0x41, 0x4c, 0x49, 0x44, 0x5f, 0x41, 0x52, 0x47, 0x55, 0x4d, 0x45,
|
||||
0x4e, 0x54, 0x10, 0x04, 0x12, 0x16, 0x0a, 0x12, 0x52, 0x45, 0x53, 0x4f, 0x55, 0x52, 0x43, 0x45,
|
||||
0x5f, 0x45, 0x58, 0x48, 0x41, 0x55, 0x53, 0x54, 0x45, 0x44, 0x10, 0x05, 0x2a, 0x88, 0x01, 0x0a,
|
||||
0x06, 0x4f, 0x62, 0x6a, 0x65, 0x63, 0x74, 0x12, 0x11, 0x0a, 0x0d, 0x41, 0x43, 0x43, 0x45, 0x53,
|
||||
0x53, 0x5f, 0x44, 0x45, 0x4e, 0x49, 0x45, 0x44, 0x10, 0x00, 0x12, 0x14, 0x0a, 0x10, 0x4f, 0x42,
|
||||
0x4a, 0x45, 0x43, 0x54, 0x5f, 0x4e, 0x4f, 0x54, 0x5f, 0x46, 0x4f, 0x55, 0x4e, 0x44, 0x10, 0x01,
|
||||
0x12, 0x0a, 0x0a, 0x06, 0x4c, 0x4f, 0x43, 0x4b, 0x45, 0x44, 0x10, 0x02, 0x12, 0x1b, 0x0a, 0x17,
|
||||
0x4c, 0x4f, 0x43, 0x4b, 0x5f, 0x4e, 0x4f, 0x4e, 0x5f, 0x52, 0x45, 0x47, 0x55, 0x4c, 0x41, 0x52,
|
||||
0x5f, 0x4f, 0x42, 0x4a, 0x45, 0x43, 0x54, 0x10, 0x03, 0x12, 0x1a, 0x0a, 0x16, 0x4f, 0x42, 0x4a,
|
||||
0x45, 0x43, 0x54, 0x5f, 0x41, 0x4c, 0x52, 0x45, 0x41, 0x44, 0x59, 0x5f, 0x52, 0x45, 0x4d, 0x4f,
|
||||
0x56, 0x45, 0x44, 0x10, 0x04, 0x12, 0x10, 0x0a, 0x0c, 0x4f, 0x55, 0x54, 0x5f, 0x4f, 0x46, 0x5f,
|
||||
0x52, 0x41, 0x4e, 0x47, 0x45, 0x10, 0x05, 0x2a, 0x55, 0x0a, 0x09, 0x43, 0x6f, 0x6e, 0x74, 0x61,
|
||||
0x69, 0x6e, 0x65, 0x72, 0x12, 0x17, 0x0a, 0x13, 0x43, 0x4f, 0x4e, 0x54, 0x41, 0x49, 0x4e, 0x45,
|
||||
0x52, 0x5f, 0x4e, 0x4f, 0x54, 0x5f, 0x46, 0x4f, 0x55, 0x4e, 0x44, 0x10, 0x00, 0x12, 0x12, 0x0a,
|
||||
0x0e, 0x45, 0x41, 0x43, 0x4c, 0x5f, 0x4e, 0x4f, 0x54, 0x5f, 0x46, 0x4f, 0x55, 0x4e, 0x44, 0x10,
|
||||
0x01, 0x12, 0x1b, 0x0a, 0x17, 0x43, 0x4f, 0x4e, 0x54, 0x41, 0x49, 0x4e, 0x45, 0x52, 0x5f, 0x41,
|
||||
0x43, 0x43, 0x45, 0x53, 0x53, 0x5f, 0x44, 0x45, 0x4e, 0x49, 0x45, 0x44, 0x10, 0x02, 0x2a, 0x31,
|
||||
0x0a, 0x07, 0x53, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x12, 0x13, 0x0a, 0x0f, 0x54, 0x4f, 0x4b,
|
||||
0x45, 0x4e, 0x5f, 0x4e, 0x4f, 0x54, 0x5f, 0x46, 0x4f, 0x55, 0x4e, 0x44, 0x10, 0x00, 0x12, 0x11,
|
||||
0x0a, 0x0d, 0x54, 0x4f, 0x4b, 0x45, 0x4e, 0x5f, 0x45, 0x58, 0x50, 0x49, 0x52, 0x45, 0x44, 0x10,
|
||||
0x01, 0x2a, 0x2b, 0x0a, 0x0a, 0x41, 0x50, 0x45, 0x4d, 0x61, 0x6e, 0x61, 0x67, 0x65, 0x72, 0x12,
|
||||
0x1d, 0x0a, 0x19, 0x41, 0x50, 0x45, 0x5f, 0x4d, 0x41, 0x4e, 0x41, 0x47, 0x45, 0x52, 0x5f, 0x41,
|
||||
0x43, 0x43, 0x45, 0x53, 0x53, 0x5f, 0x44, 0x45, 0x4e, 0x49, 0x45, 0x44, 0x10, 0x00, 0x42, 0x62,
|
||||
0x5a, 0x43, 0x67, 0x69, 0x74, 0x2e, 0x66, 0x72, 0x6f, 0x73, 0x74, 0x66, 0x73, 0x2e, 0x69, 0x6e,
|
||||
0x66, 0x6f, 0x2f, 0x54, 0x72, 0x75, 0x65, 0x43, 0x6c, 0x6f, 0x75, 0x64, 0x4c, 0x61, 0x62, 0x2f,
|
||||
0x66, 0x72, 0x6f, 0x73, 0x74, 0x66, 0x73, 0x2d, 0x73, 0x64, 0x6b, 0x2d, 0x67, 0x6f, 0x2f, 0x61,
|
||||
0x70, 0x69, 0x2f, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x2f, 0x67, 0x72, 0x70, 0x63, 0x3b, 0x73,
|
||||
0x74, 0x61, 0x74, 0x75, 0x73, 0xaa, 0x02, 0x1a, 0x4e, 0x65, 0x6f, 0x2e, 0x46, 0x69, 0x6c, 0x65,
|
||||
0x53, 0x74, 0x6f, 0x72, 0x61, 0x67, 0x65, 0x2e, 0x41, 0x50, 0x49, 0x2e, 0x53, 0x74, 0x61, 0x74,
|
||||
0x75, 0x73, 0x62, 0x08, 0x65, 0x64, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x70, 0xe8, 0x07,
|
||||
0x4e, 0x54, 0x10, 0x04, 0x2a, 0x88, 0x01, 0x0a, 0x06, 0x4f, 0x62, 0x6a, 0x65, 0x63, 0x74, 0x12,
|
||||
0x11, 0x0a, 0x0d, 0x41, 0x43, 0x43, 0x45, 0x53, 0x53, 0x5f, 0x44, 0x45, 0x4e, 0x49, 0x45, 0x44,
|
||||
0x10, 0x00, 0x12, 0x14, 0x0a, 0x10, 0x4f, 0x42, 0x4a, 0x45, 0x43, 0x54, 0x5f, 0x4e, 0x4f, 0x54,
|
||||
0x5f, 0x46, 0x4f, 0x55, 0x4e, 0x44, 0x10, 0x01, 0x12, 0x0a, 0x0a, 0x06, 0x4c, 0x4f, 0x43, 0x4b,
|
||||
0x45, 0x44, 0x10, 0x02, 0x12, 0x1b, 0x0a, 0x17, 0x4c, 0x4f, 0x43, 0x4b, 0x5f, 0x4e, 0x4f, 0x4e,
|
||||
0x5f, 0x52, 0x45, 0x47, 0x55, 0x4c, 0x41, 0x52, 0x5f, 0x4f, 0x42, 0x4a, 0x45, 0x43, 0x54, 0x10,
|
||||
0x03, 0x12, 0x1a, 0x0a, 0x16, 0x4f, 0x42, 0x4a, 0x45, 0x43, 0x54, 0x5f, 0x41, 0x4c, 0x52, 0x45,
|
||||
0x41, 0x44, 0x59, 0x5f, 0x52, 0x45, 0x4d, 0x4f, 0x56, 0x45, 0x44, 0x10, 0x04, 0x12, 0x10, 0x0a,
|
||||
0x0c, 0x4f, 0x55, 0x54, 0x5f, 0x4f, 0x46, 0x5f, 0x52, 0x41, 0x4e, 0x47, 0x45, 0x10, 0x05, 0x2a,
|
||||
0x55, 0x0a, 0x09, 0x43, 0x6f, 0x6e, 0x74, 0x61, 0x69, 0x6e, 0x65, 0x72, 0x12, 0x17, 0x0a, 0x13,
|
||||
0x43, 0x4f, 0x4e, 0x54, 0x41, 0x49, 0x4e, 0x45, 0x52, 0x5f, 0x4e, 0x4f, 0x54, 0x5f, 0x46, 0x4f,
|
||||
0x55, 0x4e, 0x44, 0x10, 0x00, 0x12, 0x12, 0x0a, 0x0e, 0x45, 0x41, 0x43, 0x4c, 0x5f, 0x4e, 0x4f,
|
||||
0x54, 0x5f, 0x46, 0x4f, 0x55, 0x4e, 0x44, 0x10, 0x01, 0x12, 0x1b, 0x0a, 0x17, 0x43, 0x4f, 0x4e,
|
||||
0x54, 0x41, 0x49, 0x4e, 0x45, 0x52, 0x5f, 0x41, 0x43, 0x43, 0x45, 0x53, 0x53, 0x5f, 0x44, 0x45,
|
||||
0x4e, 0x49, 0x45, 0x44, 0x10, 0x02, 0x2a, 0x31, 0x0a, 0x07, 0x53, 0x65, 0x73, 0x73, 0x69, 0x6f,
|
||||
0x6e, 0x12, 0x13, 0x0a, 0x0f, 0x54, 0x4f, 0x4b, 0x45, 0x4e, 0x5f, 0x4e, 0x4f, 0x54, 0x5f, 0x46,
|
||||
0x4f, 0x55, 0x4e, 0x44, 0x10, 0x00, 0x12, 0x11, 0x0a, 0x0d, 0x54, 0x4f, 0x4b, 0x45, 0x4e, 0x5f,
|
||||
0x45, 0x58, 0x50, 0x49, 0x52, 0x45, 0x44, 0x10, 0x01, 0x2a, 0x2b, 0x0a, 0x0a, 0x41, 0x50, 0x45,
|
||||
0x4d, 0x61, 0x6e, 0x61, 0x67, 0x65, 0x72, 0x12, 0x1d, 0x0a, 0x19, 0x41, 0x50, 0x45, 0x5f, 0x4d,
|
||||
0x41, 0x4e, 0x41, 0x47, 0x45, 0x52, 0x5f, 0x41, 0x43, 0x43, 0x45, 0x53, 0x53, 0x5f, 0x44, 0x45,
|
||||
0x4e, 0x49, 0x45, 0x44, 0x10, 0x00, 0x42, 0x62, 0x5a, 0x43, 0x67, 0x69, 0x74, 0x2e, 0x66, 0x72,
|
||||
0x6f, 0x73, 0x74, 0x66, 0x73, 0x2e, 0x69, 0x6e, 0x66, 0x6f, 0x2f, 0x54, 0x72, 0x75, 0x65, 0x43,
|
||||
0x6c, 0x6f, 0x75, 0x64, 0x4c, 0x61, 0x62, 0x2f, 0x66, 0x72, 0x6f, 0x73, 0x74, 0x66, 0x73, 0x2d,
|
||||
0x73, 0x64, 0x6b, 0x2d, 0x67, 0x6f, 0x2f, 0x61, 0x70, 0x69, 0x2f, 0x73, 0x74, 0x61, 0x74, 0x75,
|
||||
0x73, 0x2f, 0x67, 0x72, 0x70, 0x63, 0x3b, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0xaa, 0x02, 0x1a,
|
||||
0x4e, 0x65, 0x6f, 0x2e, 0x46, 0x69, 0x6c, 0x65, 0x53, 0x74, 0x6f, 0x72, 0x61, 0x67, 0x65, 0x2e,
|
||||
0x41, 0x50, 0x49, 0x2e, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x62, 0x08, 0x65, 0x64, 0x69, 0x74,
|
||||
0x69, 0x6f, 0x6e, 0x73, 0x70, 0xe8, 0x07,
|
||||
}
|
||||
|
||||
var file_api_status_grpc_types_proto_enumTypes = make([]protoimpl.EnumInfo, 7)
|
||||
|
|
|
@ -65,8 +65,6 @@ const (
|
|||
NodeUnderMaintenance
|
||||
// InvalidArgument is a local Code value for INVALID_ARGUMENT status.
|
||||
InvalidArgument
|
||||
// ResourceExhausted is a local Code value for RESOURCE_EXHAUSTED status.
|
||||
ResourceExhausted
|
||||
)
|
||||
|
||||
const (
|
||||
|
|
|
@ -296,62 +296,3 @@ func (x *InvalidArgument) SetMessage(v string) {
|
|||
func (x InvalidArgument) Message() string {
|
||||
return x.v2.Message()
|
||||
}
|
||||
|
||||
// ResourceExhausted is a failure status indicating that
|
||||
// the operation cannot be performed due to a lack of resources.
|
||||
// Instances provide Status and StatusV2 interfaces.
|
||||
type ResourceExhausted struct {
|
||||
v2 status.Status
|
||||
}
|
||||
|
||||
const defaultResourceExhaustedMsg = "resource exhausted"
|
||||
|
||||
// Error implements the error interface.
|
||||
func (x *ResourceExhausted) Error() string {
|
||||
msg := x.v2.Message()
|
||||
if msg == "" {
|
||||
msg = defaultResourceExhaustedMsg
|
||||
}
|
||||
|
||||
return errMessageStatusV2(
|
||||
globalizeCodeV2(status.ResourceExhausted, status.GlobalizeCommonFail),
|
||||
msg,
|
||||
)
|
||||
}
|
||||
|
||||
// implements local interface defined in FromStatusV2 func.
|
||||
func (x *ResourceExhausted) fromStatusV2(st *status.Status) {
|
||||
x.v2 = *st
|
||||
}
|
||||
|
||||
// ToStatusV2 implements StatusV2 interface method.
|
||||
// If the value was returned by FromStatusV2, returns the source message.
|
||||
// Otherwise, returns message with
|
||||
// - code: RESOURCE_EXHAUSTED;
|
||||
// - string message: written message via SetMessage or
|
||||
// "resource exhausted" as a default message;
|
||||
// - details: empty.
|
||||
func (x ResourceExhausted) ToStatusV2() *status.Status {
|
||||
x.v2.SetCode(globalizeCodeV2(status.ResourceExhausted, status.GlobalizeCommonFail))
|
||||
if x.v2.Message() == "" {
|
||||
x.v2.SetMessage(defaultResourceExhaustedMsg)
|
||||
}
|
||||
|
||||
return &x.v2
|
||||
}
|
||||
|
||||
// SetMessage writes invalid argument failure message.
|
||||
// Message should be used for debug purposes only.
|
||||
//
|
||||
// See also Message.
|
||||
func (x *ResourceExhausted) SetMessage(v string) {
|
||||
x.v2.SetMessage(v)
|
||||
}
|
||||
|
||||
// Message returns status message. Zero status returns empty message.
|
||||
// Message should be used for debug purposes only.
|
||||
//
|
||||
// See also SetMessage.
|
||||
func (x ResourceExhausted) Message() string {
|
||||
return x.v2.Message()
|
||||
}
|
||||
|
|
|
@ -167,42 +167,3 @@ func TestInvalidArgument(t *testing.T) {
|
|||
require.Equal(t, msg, stV2.Message())
|
||||
})
|
||||
}
|
||||
|
||||
func TestResourceExhausted(t *testing.T) {
|
||||
t.Run("default", func(t *testing.T) {
|
||||
var st apistatus.ResourceExhausted
|
||||
|
||||
require.Empty(t, st.Message())
|
||||
})
|
||||
|
||||
t.Run("custom message", func(t *testing.T) {
|
||||
var st apistatus.ResourceExhausted
|
||||
msg := "some message"
|
||||
|
||||
st.SetMessage(msg)
|
||||
|
||||
stV2 := st.ToStatusV2()
|
||||
|
||||
require.Equal(t, msg, st.Message())
|
||||
require.Equal(t, msg, stV2.Message())
|
||||
})
|
||||
|
||||
t.Run("empty to V2", func(t *testing.T) {
|
||||
var st apistatus.ResourceExhausted
|
||||
|
||||
stV2 := st.ToStatusV2()
|
||||
|
||||
require.Equal(t, "resource exhausted", stV2.Message())
|
||||
})
|
||||
|
||||
t.Run("non-empty to V2", func(t *testing.T) {
|
||||
var st apistatus.ResourceExhausted
|
||||
msg := "some other msg"
|
||||
|
||||
st.SetMessage(msg)
|
||||
|
||||
stV2 := st.ToStatusV2()
|
||||
|
||||
require.Equal(t, msg, stV2.Message())
|
||||
})
|
||||
}
|
||||
|
|
|
@ -80,8 +80,6 @@ func FromStatusV2(st *status.Status) Status {
|
|||
decoder = new(NodeUnderMaintenance)
|
||||
case status.InvalidArgument:
|
||||
decoder = new(InvalidArgument)
|
||||
case status.ResourceExhausted:
|
||||
decoder = new(ResourceExhausted)
|
||||
}
|
||||
case object.LocalizeFailStatus(&code):
|
||||
switch code {
|
||||
|
|
|
@ -3,7 +3,6 @@ package cid
|
|||
import (
|
||||
"crypto/sha256"
|
||||
"fmt"
|
||||
"strings"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/refs"
|
||||
"github.com/mr-tron/base58"
|
||||
|
@ -114,9 +113,3 @@ func (id *ID) DecodeString(s string) error {
|
|||
func (id ID) String() string {
|
||||
return id.EncodeToString()
|
||||
}
|
||||
|
||||
// Cmp returns an integer comparing two base58 encoded container ID lexicographically.
|
||||
// The result will be 0 if id1 == id2, -1 if id1 < id2, and +1 if id1 > id2.
|
||||
func (id ID) Cmp(id2 ID) int {
|
||||
return strings.Compare(id.EncodeToString(), id2.EncodeToString())
|
||||
}
|
||||
|
|
|
@ -3,8 +3,6 @@ package cid_test
|
|||
import (
|
||||
"crypto/rand"
|
||||
"crypto/sha256"
|
||||
"slices"
|
||||
"strings"
|
||||
"testing"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/refs"
|
||||
|
@ -108,17 +106,3 @@ func TestID_Encode(t *testing.T) {
|
|||
require.Equal(t, emptyID, id.EncodeToString())
|
||||
})
|
||||
}
|
||||
|
||||
func TestID_Cmp(t *testing.T) {
|
||||
var arr []cid.ID
|
||||
for i := 0; i < 3; i++ {
|
||||
checksum := randSHA256Checksum()
|
||||
arr = append(arr, cidtest.IDWithChecksum(checksum))
|
||||
}
|
||||
|
||||
slices.SortFunc(arr, cid.ID.Cmp)
|
||||
|
||||
for i := 1; i < len(arr); i++ {
|
||||
require.NotEqual(t, strings.Compare(arr[i-1].EncodeToString(), arr[i].EncodeToString()), 1, "array is not sorted correctly")
|
||||
}
|
||||
}
|
||||
|
|
|
@ -4,7 +4,6 @@ import (
|
|||
"crypto/ecdsa"
|
||||
"crypto/sha256"
|
||||
"fmt"
|
||||
"strings"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/refs"
|
||||
frostfscrypto "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/crypto"
|
||||
|
@ -168,9 +167,3 @@ func (id *ID) UnmarshalJSON(data []byte) error {
|
|||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Cmp returns an integer comparing two base58 encoded object ID lexicographically.
|
||||
// The result will be 0 if id1 == id2, -1 if id1 < id2, and +1 if id1 > id2.
|
||||
func (id ID) Cmp(id2 ID) int {
|
||||
return strings.Compare(id.EncodeToString(), id2.EncodeToString())
|
||||
}
|
||||
|
|
|
@ -3,9 +3,7 @@ package oid
|
|||
import (
|
||||
"crypto/rand"
|
||||
"crypto/sha256"
|
||||
"slices"
|
||||
"strconv"
|
||||
"strings"
|
||||
"testing"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/refs"
|
||||
|
@ -182,16 +180,3 @@ func TestID_Encode(t *testing.T) {
|
|||
require.Equal(t, emptyID, id.EncodeToString())
|
||||
})
|
||||
}
|
||||
|
||||
func TestID_Cmp(t *testing.T) {
|
||||
id1 := randID(t)
|
||||
id2 := randID(t)
|
||||
id3 := randID(t)
|
||||
|
||||
arr := []ID{id1, id2, id3}
|
||||
|
||||
slices.SortFunc(arr, ID.Cmp)
|
||||
for i := 1; i < len(arr); i++ {
|
||||
require.NotEqual(t, strings.Compare(arr[i-1].EncodeToString(), arr[i].EncodeToString()), 1, "array is not sorted correctly")
|
||||
}
|
||||
}
|
||||
|
|
|
@ -6,7 +6,6 @@ import (
|
|||
"crypto/sha256"
|
||||
"fmt"
|
||||
|
||||
objectV2 "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/object"
|
||||
buffPool "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/util/pool"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/checksum"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||
|
@ -328,11 +327,4 @@ func (s *payloadSizeLimiter) prepareFirstChild() {
|
|||
s.current.SetAttributes()
|
||||
|
||||
// attributes will be added to parent in detachParent
|
||||
|
||||
// add expiration epoch to each part
|
||||
for _, attr := range s.parAttrs {
|
||||
if attr.Key() == objectV2.SysAttributeExpEpoch {
|
||||
s.current.SetAttributes(attr)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
1283
pool/client.go
1283
pool/client.go
File diff suppressed because it is too large
Load diff
|
@ -1,330 +0,0 @@
|
|||
package pool
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"math/rand"
|
||||
"sort"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
||||
"go.uber.org/zap"
|
||||
"go.uber.org/zap/zapcore"
|
||||
)
|
||||
|
||||
type innerPool struct {
|
||||
lock sync.RWMutex
|
||||
sampler *sampler
|
||||
clients []client
|
||||
}
|
||||
|
||||
type connectionManager struct {
|
||||
innerPools []*innerPool
|
||||
rebalanceParams rebalanceParameters
|
||||
clientBuilder clientBuilder
|
||||
logger *zap.Logger
|
||||
healthChecker *healthCheck
|
||||
}
|
||||
|
||||
// newConnectionManager returns an instance of connectionManager configured according to the parameters.
|
||||
//
|
||||
// Before using connectionManager, you MUST call Dial.
|
||||
func newConnectionManager(options InitParameters) (*connectionManager, error) {
|
||||
if options.key == nil {
|
||||
return nil, fmt.Errorf("missed required parameter 'Key'")
|
||||
}
|
||||
|
||||
nodesParams, err := adjustNodeParams(options.nodeParams)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
manager := &connectionManager{
|
||||
logger: options.logger,
|
||||
rebalanceParams: rebalanceParameters{
|
||||
nodesParams: nodesParams,
|
||||
nodeRequestTimeout: options.healthcheckTimeout,
|
||||
clientRebalanceInterval: options.clientRebalanceInterval,
|
||||
sessionExpirationDuration: options.sessionExpirationDuration,
|
||||
},
|
||||
clientBuilder: options.clientBuilder,
|
||||
}
|
||||
|
||||
return manager, nil
|
||||
}
|
||||
|
||||
func (cm *connectionManager) dial(ctx context.Context) error {
|
||||
inner := make([]*innerPool, len(cm.rebalanceParams.nodesParams))
|
||||
var atLeastOneHealthy bool
|
||||
|
||||
for i, params := range cm.rebalanceParams.nodesParams {
|
||||
clients := make([]client, len(params.weights))
|
||||
for j, addr := range params.addresses {
|
||||
clients[j] = cm.clientBuilder(addr)
|
||||
if err := clients[j].dial(ctx); err != nil {
|
||||
cm.log(zap.WarnLevel, "failed to build client", zap.String("address", addr), zap.Error(err))
|
||||
continue
|
||||
}
|
||||
atLeastOneHealthy = true
|
||||
}
|
||||
source := rand.NewSource(time.Now().UnixNano())
|
||||
sampl := newSampler(params.weights, source)
|
||||
|
||||
inner[i] = &innerPool{
|
||||
sampler: sampl,
|
||||
clients: clients,
|
||||
}
|
||||
}
|
||||
|
||||
if !atLeastOneHealthy {
|
||||
return fmt.Errorf("at least one node must be healthy")
|
||||
}
|
||||
|
||||
cm.innerPools = inner
|
||||
|
||||
cm.healthChecker = newHealthCheck(cm.rebalanceParams.clientRebalanceInterval)
|
||||
cm.healthChecker.startRebalance(ctx, cm.rebalance)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (cm *connectionManager) rebalance(ctx context.Context) {
|
||||
buffers := make([][]float64, len(cm.rebalanceParams.nodesParams))
|
||||
for i, params := range cm.rebalanceParams.nodesParams {
|
||||
buffers[i] = make([]float64, len(params.weights))
|
||||
}
|
||||
|
||||
cm.updateNodesHealth(ctx, buffers)
|
||||
}
|
||||
|
||||
func (cm *connectionManager) log(level zapcore.Level, msg string, fields ...zap.Field) {
|
||||
if cm.logger == nil {
|
||||
return
|
||||
}
|
||||
|
||||
cm.logger.Log(level, msg, fields...)
|
||||
}
|
||||
|
||||
func adjustNodeParams(nodeParams []NodeParam) ([]*nodesParam, error) {
|
||||
if len(nodeParams) == 0 {
|
||||
return nil, errors.New("no FrostFS peers configured")
|
||||
}
|
||||
|
||||
nodesParamsMap := make(map[int]*nodesParam)
|
||||
for _, param := range nodeParams {
|
||||
nodes, ok := nodesParamsMap[param.priority]
|
||||
if !ok {
|
||||
nodes = &nodesParam{priority: param.priority}
|
||||
}
|
||||
nodes.addresses = append(nodes.addresses, param.address)
|
||||
nodes.weights = append(nodes.weights, param.weight)
|
||||
nodesParamsMap[param.priority] = nodes
|
||||
}
|
||||
|
||||
nodesParams := make([]*nodesParam, 0, len(nodesParamsMap))
|
||||
for _, nodes := range nodesParamsMap {
|
||||
nodes.weights = adjustWeights(nodes.weights)
|
||||
nodesParams = append(nodesParams, nodes)
|
||||
}
|
||||
|
||||
sort.Slice(nodesParams, func(i, j int) bool {
|
||||
return nodesParams[i].priority < nodesParams[j].priority
|
||||
})
|
||||
|
||||
return nodesParams, nil
|
||||
}
|
||||
|
||||
func (cm *connectionManager) updateNodesHealth(ctx context.Context, buffers [][]float64) {
|
||||
wg := sync.WaitGroup{}
|
||||
for i, inner := range cm.innerPools {
|
||||
wg.Add(1)
|
||||
|
||||
bufferWeights := buffers[i]
|
||||
go func(i int, _ *innerPool) {
|
||||
defer wg.Done()
|
||||
cm.updateInnerNodesHealth(ctx, i, bufferWeights)
|
||||
}(i, inner)
|
||||
}
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
func (cm *connectionManager) updateInnerNodesHealth(ctx context.Context, i int, bufferWeights []float64) {
|
||||
if i > len(cm.innerPools)-1 {
|
||||
return
|
||||
}
|
||||
pool := cm.innerPools[i]
|
||||
options := cm.rebalanceParams
|
||||
|
||||
healthyChanged := new(atomic.Bool)
|
||||
wg := sync.WaitGroup{}
|
||||
|
||||
for j, cli := range pool.clients {
|
||||
wg.Add(1)
|
||||
go func(j int, cli client) {
|
||||
defer wg.Done()
|
||||
|
||||
tctx, c := context.WithTimeout(ctx, options.nodeRequestTimeout)
|
||||
defer c()
|
||||
|
||||
changed, err := restartIfUnhealthy(tctx, cli)
|
||||
healthy := err == nil
|
||||
if healthy {
|
||||
bufferWeights[j] = options.nodesParams[i].weights[j]
|
||||
} else {
|
||||
bufferWeights[j] = 0
|
||||
}
|
||||
|
||||
if changed {
|
||||
fields := []zap.Field{zap.String("address", cli.address()), zap.Bool("healthy", healthy)}
|
||||
if err != nil {
|
||||
fields = append(fields, zap.String("reason", err.Error()))
|
||||
}
|
||||
|
||||
cm.log(zap.DebugLevel, "health has changed", fields...)
|
||||
healthyChanged.Store(true)
|
||||
}
|
||||
}(j, cli)
|
||||
}
|
||||
wg.Wait()
|
||||
|
||||
if healthyChanged.Load() {
|
||||
probabilities := adjustWeights(bufferWeights)
|
||||
source := rand.NewSource(time.Now().UnixNano())
|
||||
pool.lock.Lock()
|
||||
pool.sampler = newSampler(probabilities, source)
|
||||
pool.lock.Unlock()
|
||||
}
|
||||
}
|
||||
|
||||
// restartIfUnhealthy checks healthy status of client and recreate it if status is unhealthy.
|
||||
// Indicating if status was changed by this function call and returns error that caused unhealthy status.
|
||||
func restartIfUnhealthy(ctx context.Context, c client) (changed bool, err error) {
|
||||
defer func() {
|
||||
if err != nil {
|
||||
c.setUnhealthy()
|
||||
} else {
|
||||
c.setHealthy()
|
||||
}
|
||||
}()
|
||||
|
||||
wasHealthy := c.isHealthy()
|
||||
|
||||
if res, err := c.healthcheck(ctx); err == nil {
|
||||
if res.Status().IsMaintenance() {
|
||||
return wasHealthy, new(apistatus.NodeUnderMaintenance)
|
||||
}
|
||||
|
||||
return !wasHealthy, nil
|
||||
}
|
||||
|
||||
if err = c.restart(ctx); err != nil {
|
||||
return wasHealthy, err
|
||||
}
|
||||
|
||||
res, err := c.healthcheck(ctx)
|
||||
if err != nil {
|
||||
return wasHealthy, err
|
||||
}
|
||||
|
||||
if res.Status().IsMaintenance() {
|
||||
return wasHealthy, new(apistatus.NodeUnderMaintenance)
|
||||
}
|
||||
|
||||
return !wasHealthy, nil
|
||||
}
|
||||
|
||||
func adjustWeights(weights []float64) []float64 {
|
||||
adjusted := make([]float64, len(weights))
|
||||
sum := 0.0
|
||||
for _, weight := range weights {
|
||||
sum += weight
|
||||
}
|
||||
if sum > 0 {
|
||||
for i, weight := range weights {
|
||||
adjusted[i] = weight / sum
|
||||
}
|
||||
}
|
||||
|
||||
return adjusted
|
||||
}
|
||||
|
||||
func (cm *connectionManager) connection() (client, error) {
|
||||
for _, inner := range cm.innerPools {
|
||||
cp, err := inner.connection()
|
||||
if err == nil {
|
||||
return cp, nil
|
||||
}
|
||||
}
|
||||
|
||||
return nil, errors.New("no healthy client")
|
||||
}
|
||||
|
||||
// iterate iterates over all clients in all innerPools.
|
||||
func (cm *connectionManager) iterate(cb func(client)) {
|
||||
for _, inner := range cm.innerPools {
|
||||
for _, cl := range inner.clients {
|
||||
if cl.isHealthy() {
|
||||
cb(cl)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (p *innerPool) connection() (client, error) {
|
||||
p.lock.RLock() // need lock because of using p.sampler
|
||||
defer p.lock.RUnlock()
|
||||
if len(p.clients) == 1 {
|
||||
cp := p.clients[0]
|
||||
if cp.isHealthy() {
|
||||
return cp, nil
|
||||
}
|
||||
return nil, errors.New("no healthy client")
|
||||
}
|
||||
attempts := 3 * len(p.clients)
|
||||
for range attempts {
|
||||
i := p.sampler.Next()
|
||||
if cp := p.clients[i]; cp.isHealthy() {
|
||||
return cp, nil
|
||||
}
|
||||
}
|
||||
|
||||
return nil, errors.New("no healthy client")
|
||||
}
|
||||
|
||||
func (cm connectionManager) Statistic() Statistic {
|
||||
stat := Statistic{}
|
||||
for _, inner := range cm.innerPools {
|
||||
nodes := make([]string, 0, len(inner.clients))
|
||||
for _, cl := range inner.clients {
|
||||
if cl.isHealthy() {
|
||||
nodes = append(nodes, cl.address())
|
||||
}
|
||||
node := NodeStatistic{
|
||||
address: cl.address(),
|
||||
methods: cl.methodsStatus(),
|
||||
overallErrors: cl.overallErrorRate(),
|
||||
currentErrors: cl.currentErrorRate(),
|
||||
}
|
||||
stat.nodes = append(stat.nodes, node)
|
||||
stat.overallErrors += node.overallErrors
|
||||
}
|
||||
if len(stat.currentNodes) == 0 {
|
||||
stat.currentNodes = nodes
|
||||
}
|
||||
}
|
||||
|
||||
return stat
|
||||
}
|
||||
|
||||
func (cm *connectionManager) close() {
|
||||
cm.healthChecker.stopRebalance()
|
||||
|
||||
// close all clients
|
||||
for _, pools := range cm.innerPools {
|
||||
for _, cli := range pools.clients {
|
||||
_ = cli.close()
|
||||
}
|
||||
}
|
||||
}
|
|
@ -1,47 +0,0 @@
|
|||
package pool
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
)
|
||||
|
||||
type healthCheck struct {
|
||||
cancel context.CancelFunc
|
||||
closedCh chan struct{}
|
||||
|
||||
clientRebalanceInterval time.Duration
|
||||
}
|
||||
|
||||
func newHealthCheck(clientRebalanceInterval time.Duration) *healthCheck {
|
||||
var h healthCheck
|
||||
h.clientRebalanceInterval = clientRebalanceInterval
|
||||
h.closedCh = make(chan struct{})
|
||||
return &h
|
||||
}
|
||||
|
||||
// startRebalance runs loop to monitor connection healthy status.
|
||||
func (h *healthCheck) startRebalance(ctx context.Context, callback func(ctx context.Context)) {
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
h.cancel = cancel
|
||||
|
||||
go func() {
|
||||
ticker := time.NewTicker(h.clientRebalanceInterval)
|
||||
defer ticker.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
close(h.closedCh)
|
||||
return
|
||||
case <-ticker.C:
|
||||
callback(ctx)
|
||||
ticker.Reset(h.clientRebalanceInterval)
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
func (h *healthCheck) stopRebalance() {
|
||||
h.cancel()
|
||||
<-h.closedCh
|
||||
}
|
1709
pool/pool.go
1709
pool/pool.go
File diff suppressed because it is too large
Load diff
|
@ -104,7 +104,7 @@ func TestBuildPoolOneNodeFailed(t *testing.T) {
|
|||
|
||||
expectedAuthKey := frostfsecdsa.PublicKey(clientKeys[1].PublicKey)
|
||||
condition := func() bool {
|
||||
cp, err := clientPool.manager.connection()
|
||||
cp, err := clientPool.connection()
|
||||
if err != nil {
|
||||
return false
|
||||
}
|
||||
|
@ -141,7 +141,7 @@ func TestOneNode(t *testing.T) {
|
|||
require.NoError(t, err)
|
||||
t.Cleanup(pool.Close)
|
||||
|
||||
cp, err := pool.manager.connection()
|
||||
cp, err := pool.connection()
|
||||
require.NoError(t, err)
|
||||
st, _ := pool.cache.Get(formCacheKey(cp.address(), pool.key, false))
|
||||
expectedAuthKey := frostfsecdsa.PublicKey(key1.PublicKey)
|
||||
|
@ -171,7 +171,7 @@ func TestTwoNodes(t *testing.T) {
|
|||
require.NoError(t, err)
|
||||
t.Cleanup(pool.Close)
|
||||
|
||||
cp, err := pool.manager.connection()
|
||||
cp, err := pool.connection()
|
||||
require.NoError(t, err)
|
||||
st, _ := pool.cache.Get(formCacheKey(cp.address(), pool.key, false))
|
||||
require.True(t, assertAuthKeyForAny(st, clientKeys))
|
||||
|
@ -220,12 +220,13 @@ func TestOneOfTwoFailed(t *testing.T) {
|
|||
err = pool.Dial(context.Background())
|
||||
require.NoError(t, err)
|
||||
|
||||
require.NoError(t, err)
|
||||
t.Cleanup(pool.Close)
|
||||
|
||||
time.Sleep(2 * time.Second)
|
||||
|
||||
for range 5 {
|
||||
cp, err := pool.manager.connection()
|
||||
cp, err := pool.connection()
|
||||
require.NoError(t, err)
|
||||
st, _ := pool.cache.Get(formCacheKey(cp.address(), pool.key, false))
|
||||
require.True(t, assertAuthKeyForAny(st, clientKeys))
|
||||
|
@ -368,7 +369,7 @@ func TestUpdateNodesHealth(t *testing.T) {
|
|||
tc.prepareCli(cli)
|
||||
p, log := newPool(t, cli)
|
||||
|
||||
p.manager.updateNodesHealth(ctx, [][]float64{{1}})
|
||||
p.updateNodesHealth(ctx, [][]float64{{1}})
|
||||
|
||||
changed := tc.wasHealthy != tc.willHealthy
|
||||
require.Equalf(t, tc.willHealthy, cli.isHealthy(), "healthy status should be: %v", tc.willHealthy)
|
||||
|
@ -384,19 +385,19 @@ func newPool(t *testing.T, cli *mockClient) (*Pool, *observer.ObservedLogs) {
|
|||
require.NoError(t, err)
|
||||
|
||||
return &Pool{
|
||||
cache: cache,
|
||||
key: newPrivateKey(t),
|
||||
manager: &connectionManager{
|
||||
innerPools: []*innerPool{{
|
||||
sampler: newSampler([]float64{1}, rand.NewSource(0)),
|
||||
clients: []client{cli},
|
||||
}},
|
||||
healthChecker: newHealthCheck(200 * time.Millisecond),
|
||||
rebalanceParams: rebalanceParameters{
|
||||
nodesParams: []*nodesParam{{1, []string{"peer0"}, []float64{1}}},
|
||||
nodeRequestTimeout: time.Second,
|
||||
},
|
||||
logger: log},
|
||||
innerPools: []*innerPool{{
|
||||
sampler: newSampler([]float64{1}, rand.NewSource(0)),
|
||||
clients: []client{cli},
|
||||
}},
|
||||
cache: cache,
|
||||
key: newPrivateKey(t),
|
||||
closedCh: make(chan struct{}),
|
||||
rebalanceParams: rebalanceParameters{
|
||||
nodesParams: []*nodesParam{{1, []string{"peer0"}, []float64{1}}},
|
||||
nodeRequestTimeout: time.Second,
|
||||
clientRebalanceInterval: 200 * time.Millisecond,
|
||||
},
|
||||
logger: log,
|
||||
}, observedLog
|
||||
}
|
||||
|
||||
|
@ -434,7 +435,7 @@ func TestTwoFailed(t *testing.T) {
|
|||
|
||||
time.Sleep(2 * time.Second)
|
||||
|
||||
_, err = pool.manager.connection()
|
||||
_, err = pool.connection()
|
||||
require.Error(t, err)
|
||||
require.Contains(t, err.Error(), "no healthy")
|
||||
}
|
||||
|
@ -468,7 +469,7 @@ func TestSessionCache(t *testing.T) {
|
|||
t.Cleanup(pool.Close)
|
||||
|
||||
// cache must contain session token
|
||||
cp, err := pool.manager.connection()
|
||||
cp, err := pool.connection()
|
||||
require.NoError(t, err)
|
||||
st, _ := pool.cache.Get(formCacheKey(cp.address(), pool.key, false))
|
||||
require.True(t, st.AssertAuthKey(&expectedAuthKey))
|
||||
|
@ -481,7 +482,7 @@ func TestSessionCache(t *testing.T) {
|
|||
require.Error(t, err)
|
||||
|
||||
// cache must not contain session token
|
||||
cp, err = pool.manager.connection()
|
||||
cp, err = pool.connection()
|
||||
require.NoError(t, err)
|
||||
_, ok := pool.cache.Get(formCacheKey(cp.address(), pool.key, false))
|
||||
require.False(t, ok)
|
||||
|
@ -493,7 +494,7 @@ func TestSessionCache(t *testing.T) {
|
|||
require.NoError(t, err)
|
||||
|
||||
// cache must contain session token
|
||||
cp, err = pool.manager.connection()
|
||||
cp, err = pool.connection()
|
||||
require.NoError(t, err)
|
||||
st, _ = pool.cache.Get(formCacheKey(cp.address(), pool.key, false))
|
||||
require.True(t, st.AssertAuthKey(&expectedAuthKey))
|
||||
|
@ -537,7 +538,7 @@ func TestPriority(t *testing.T) {
|
|||
|
||||
expectedAuthKey1 := frostfsecdsa.PublicKey(clientKeys[0].PublicKey)
|
||||
firstNode := func() bool {
|
||||
cp, err := pool.manager.connection()
|
||||
cp, err := pool.connection()
|
||||
require.NoError(t, err)
|
||||
st, _ := pool.cache.Get(formCacheKey(cp.address(), pool.key, false))
|
||||
return st.AssertAuthKey(&expectedAuthKey1)
|
||||
|
@ -545,7 +546,7 @@ func TestPriority(t *testing.T) {
|
|||
|
||||
expectedAuthKey2 := frostfsecdsa.PublicKey(clientKeys[1].PublicKey)
|
||||
secondNode := func() bool {
|
||||
cp, err := pool.manager.connection()
|
||||
cp, err := pool.connection()
|
||||
require.NoError(t, err)
|
||||
st, _ := pool.cache.Get(formCacheKey(cp.address(), pool.key, false))
|
||||
return st.AssertAuthKey(&expectedAuthKey2)
|
||||
|
@ -582,7 +583,7 @@ func TestSessionCacheWithKey(t *testing.T) {
|
|||
require.NoError(t, err)
|
||||
|
||||
// cache must contain session token
|
||||
cp, err := pool.manager.connection()
|
||||
cp, err := pool.connection()
|
||||
require.NoError(t, err)
|
||||
st, _ := pool.cache.Get(formCacheKey(cp.address(), pool.key, false))
|
||||
require.True(t, st.AssertAuthKey(&expectedAuthKey))
|
||||
|
@ -635,8 +636,9 @@ func TestSessionTokenOwner(t *testing.T) {
|
|||
cc.sessionTarget = func(tok session.Object) {
|
||||
tkn = tok
|
||||
}
|
||||
err = p.initCall(&cc, prm, prmCtx)
|
||||
err = p.initCallContext(&cc, prm, prmCtx)
|
||||
require.NoError(t, err)
|
||||
|
||||
err = p.openDefaultSession(ctx, &cc)
|
||||
require.NoError(t, err)
|
||||
require.True(t, tkn.VerifySignature())
|
||||
|
@ -920,14 +922,14 @@ func TestSwitchAfterErrorThreshold(t *testing.T) {
|
|||
t.Cleanup(pool.Close)
|
||||
|
||||
for range errorThreshold {
|
||||
conn, err := pool.manager.connection()
|
||||
conn, err := pool.connection()
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, nodes[0].address, conn.address())
|
||||
_, err = conn.objectGet(ctx, PrmObjectGet{})
|
||||
require.Error(t, err)
|
||||
}
|
||||
|
||||
conn, err := pool.manager.connection()
|
||||
conn, err := pool.connection()
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, nodes[1].address, conn.address())
|
||||
_, err = conn.objectGet(ctx, PrmObjectGet{})
|
||||
|
|
|
@ -47,6 +47,9 @@ func TestHealthyReweight(t *testing.T) {
|
|||
buffer = make([]float64, len(weights))
|
||||
)
|
||||
|
||||
cache, err := newCache(0)
|
||||
require.NoError(t, err)
|
||||
|
||||
client1 := newMockClient(names[0], *newPrivateKey(t))
|
||||
client1.errOnDial()
|
||||
|
||||
|
@ -56,20 +59,22 @@ func TestHealthyReweight(t *testing.T) {
|
|||
sampler: newSampler(weights, rand.NewSource(0)),
|
||||
clients: []client{client1, client2},
|
||||
}
|
||||
cm := &connectionManager{
|
||||
p := &Pool{
|
||||
innerPools: []*innerPool{inner},
|
||||
cache: cache,
|
||||
key: newPrivateKey(t),
|
||||
rebalanceParams: rebalanceParameters{nodesParams: []*nodesParam{{weights: weights}}},
|
||||
}
|
||||
|
||||
// check getting first node connection before rebalance happened
|
||||
connection0, err := cm.connection()
|
||||
connection0, err := p.connection()
|
||||
require.NoError(t, err)
|
||||
mock0 := connection0.(*mockClient)
|
||||
require.Equal(t, names[0], mock0.address())
|
||||
|
||||
cm.updateInnerNodesHealth(context.TODO(), 0, buffer)
|
||||
p.updateInnerNodesHealth(context.TODO(), 0, buffer)
|
||||
|
||||
connection1, err := cm.connection()
|
||||
connection1, err := p.connection()
|
||||
require.NoError(t, err)
|
||||
mock1 := connection1.(*mockClient)
|
||||
require.Equal(t, names[1], mock1.address())
|
||||
|
@ -79,10 +84,10 @@ func TestHealthyReweight(t *testing.T) {
|
|||
inner.clients[0] = newMockClient(names[0], *newPrivateKey(t))
|
||||
inner.lock.Unlock()
|
||||
|
||||
cm.updateInnerNodesHealth(context.TODO(), 0, buffer)
|
||||
p.updateInnerNodesHealth(context.TODO(), 0, buffer)
|
||||
inner.sampler = newSampler(weights, rand.NewSource(0))
|
||||
|
||||
connection0, err = cm.connection()
|
||||
connection0, err = p.connection()
|
||||
require.NoError(t, err)
|
||||
mock0 = connection0.(*mockClient)
|
||||
require.Equal(t, names[0], mock0.address())
|
||||
|
@ -103,12 +108,12 @@ func TestHealthyNoReweight(t *testing.T) {
|
|||
newMockClient(names[1], *newPrivateKey(t)),
|
||||
},
|
||||
}
|
||||
cm := &connectionManager{
|
||||
p := &Pool{
|
||||
innerPools: []*innerPool{inner},
|
||||
rebalanceParams: rebalanceParameters{nodesParams: []*nodesParam{{weights: weights}}},
|
||||
}
|
||||
|
||||
cm.updateInnerNodesHealth(context.TODO(), 0, buffer)
|
||||
p.updateInnerNodesHealth(context.TODO(), 0, buffer)
|
||||
|
||||
inner.lock.RLock()
|
||||
defer inner.lock.RUnlock()
|
||||
|
|
|
@ -1,82 +0,0 @@
|
|||
package tree
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
type (
|
||||
circuitBreaker struct {
|
||||
breakDuration time.Duration
|
||||
threshold int
|
||||
|
||||
mu sync.RWMutex
|
||||
state map[uint64]state
|
||||
}
|
||||
|
||||
state struct {
|
||||
counter int
|
||||
breakTimestamp time.Time
|
||||
}
|
||||
)
|
||||
|
||||
var ErrCBClosed = errors.New("circuit breaker is closed")
|
||||
|
||||
func newCircuitBreaker(breakDuration time.Duration, threshold int) *circuitBreaker {
|
||||
return &circuitBreaker{
|
||||
breakDuration: breakDuration,
|
||||
threshold: threshold,
|
||||
state: make(map[uint64]state),
|
||||
}
|
||||
}
|
||||
|
||||
func (cb *circuitBreaker) checkBreak(id uint64) error {
|
||||
cb.mu.RLock()
|
||||
s, ok := cb.state[id]
|
||||
cb.mu.RUnlock()
|
||||
|
||||
if ok && time.Since(s.breakTimestamp) < cb.breakDuration {
|
||||
return ErrCBClosed
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (cb *circuitBreaker) openBreak(id uint64) {
|
||||
cb.mu.Lock()
|
||||
defer cb.mu.Unlock()
|
||||
delete(cb.state, id)
|
||||
}
|
||||
|
||||
func (cb *circuitBreaker) incError(id uint64) {
|
||||
cb.mu.Lock()
|
||||
defer cb.mu.Unlock()
|
||||
|
||||
s := cb.state[id]
|
||||
|
||||
s.counter++
|
||||
if s.counter >= cb.threshold {
|
||||
s.counter = cb.threshold
|
||||
if time.Since(s.breakTimestamp) >= cb.breakDuration {
|
||||
s.breakTimestamp = time.Now()
|
||||
}
|
||||
}
|
||||
|
||||
cb.state[id] = s
|
||||
}
|
||||
|
||||
func (cb *circuitBreaker) Do(id uint64, f func() error) error {
|
||||
if err := cb.checkBreak(id); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err := f()
|
||||
if err == nil {
|
||||
cb.openBreak(id)
|
||||
} else {
|
||||
cb.incError(id)
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
|
@ -1,68 +0,0 @@
|
|||
package tree
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestCircuitBreaker(t *testing.T) {
|
||||
remoteErr := errors.New("service is being synchronized")
|
||||
breakDuration := 1 * time.Second
|
||||
threshold := 10
|
||||
cb := newCircuitBreaker(breakDuration, threshold)
|
||||
|
||||
// Hit threshold
|
||||
for i := 0; i < threshold; i++ {
|
||||
err := cb.Do(1, func() error { return remoteErr })
|
||||
require.ErrorIs(t, err, remoteErr)
|
||||
}
|
||||
|
||||
// Different client should not be affected by threshold
|
||||
require.NoError(t, cb.Do(2, func() error { return nil }))
|
||||
|
||||
// Immediate request should return circuit breaker error
|
||||
require.ErrorIs(t, cb.Do(1, func() error { return nil }), ErrCBClosed)
|
||||
|
||||
// Request after breakDuration should be ok
|
||||
time.Sleep(breakDuration)
|
||||
require.NoError(t, cb.Do(1, func() error { return nil }))
|
||||
|
||||
// Try hitting threshold one more time after break duration
|
||||
for i := 0; i < threshold; i++ {
|
||||
err := cb.Do(1, func() error { return remoteErr })
|
||||
require.ErrorIs(t, err, remoteErr)
|
||||
}
|
||||
|
||||
// Immediate request should return circuit breaker error
|
||||
require.ErrorIs(t, cb.Do(1, func() error { return nil }), ErrCBClosed)
|
||||
}
|
||||
|
||||
func TestCircuitBreakerNoBlock(t *testing.T) {
|
||||
remoteErr := errors.New("service is being synchronized")
|
||||
funcDuration := 200 * time.Millisecond
|
||||
threshold := 100
|
||||
cb := newCircuitBreaker(10*funcDuration, threshold)
|
||||
|
||||
slowFunc := func() error {
|
||||
time.Sleep(funcDuration)
|
||||
return remoteErr
|
||||
}
|
||||
|
||||
for i := 0; i < threshold; i++ {
|
||||
// run in multiple goroutines Do function
|
||||
go func() {
|
||||
cb.Do(1, slowFunc)
|
||||
}()
|
||||
}
|
||||
|
||||
time.Sleep(funcDuration)
|
||||
|
||||
// eventually at most after one more func duration circuit breaker will be
|
||||
// closed and not blocked by slow func execution under mutex
|
||||
require.Eventually(t, func() bool {
|
||||
return errors.Is(cb.Do(1, func() error { return nil }), ErrCBClosed)
|
||||
}, funcDuration, funcDuration/10)
|
||||
}
|
|
@ -24,12 +24,10 @@ import (
|
|||
)
|
||||
|
||||
const (
|
||||
defaultRebalanceInterval = 15 * time.Second
|
||||
defaultHealthcheckTimeout = 4 * time.Second
|
||||
defaultDialTimeout = 5 * time.Second
|
||||
defaultStreamTimeout = 10 * time.Second
|
||||
defaultCircuitBreakerDuration = 10 * time.Second
|
||||
defaultCircuitBreakerTreshold = 10
|
||||
defaultRebalanceInterval = 15 * time.Second
|
||||
defaultHealthcheckTimeout = 4 * time.Second
|
||||
defaultDialTimeout = 5 * time.Second
|
||||
defaultStreamTimeout = 10 * time.Second
|
||||
)
|
||||
|
||||
// SubTreeSort defines an order of nodes returned from GetSubTree RPC.
|
||||
|
@ -78,8 +76,6 @@ type InitParameters struct {
|
|||
dialOptions []grpc.DialOption
|
||||
maxRequestAttempts int
|
||||
netMapInfoSource NetMapInfoSource
|
||||
circuitBreakerThreshold int
|
||||
circuitBreakerDuration time.Duration
|
||||
}
|
||||
|
||||
type NetMapInfoSource interface {
|
||||
|
@ -121,8 +117,6 @@ type Pool struct {
|
|||
// * retry in case of request failure (see Pool.requestWithRetry)
|
||||
// startIndices will be used if netMapInfoSource is not set
|
||||
startIndices [2]int
|
||||
// circuit breaker for dial operations when netmap is being used
|
||||
cb *circuitBreaker
|
||||
}
|
||||
|
||||
type innerPool struct {
|
||||
|
@ -254,10 +248,6 @@ func NewPool(options InitParameters) (*Pool, error) {
|
|||
methods: methods,
|
||||
netMapInfoSource: options.netMapInfoSource,
|
||||
clientMap: make(map[uint64]client),
|
||||
cb: newCircuitBreaker(
|
||||
options.circuitBreakerDuration,
|
||||
options.circuitBreakerThreshold,
|
||||
),
|
||||
}
|
||||
|
||||
if options.netMapInfoSource == nil {
|
||||
|
@ -376,18 +366,6 @@ func (x *InitParameters) SetNetMapInfoSource(netMapInfoSource NetMapInfoSource)
|
|||
x.netMapInfoSource = netMapInfoSource
|
||||
}
|
||||
|
||||
// SetCircuitBreakerThreshold sets number of consecutive failed connection before
|
||||
// circuit is considered closed and therefore return error immediately.
|
||||
func (x *InitParameters) SetCircuitBreakerThreshold(circuitBreakerThreshold int) {
|
||||
x.circuitBreakerThreshold = circuitBreakerThreshold
|
||||
}
|
||||
|
||||
// SetCircuitBreakerDuration sets duration for circuit to be considered closed.
|
||||
// This effectively limits to one new connection try per duration.
|
||||
func (x *InitParameters) SetCircuitBreakerDuration(circuitBreakerDuration time.Duration) {
|
||||
x.circuitBreakerDuration = circuitBreakerDuration
|
||||
}
|
||||
|
||||
// GetNodes invokes eponymous method from TreeServiceClient.
|
||||
//
|
||||
// Can return predefined errors:
|
||||
|
@ -436,19 +414,12 @@ func (p *Pool) GetNodes(ctx context.Context, prm GetNodesParams) ([]*tree.GetNod
|
|||
//
|
||||
// Must be initialized using Pool.GetSubTree, any other usage is unsafe.
|
||||
type SubTreeReader struct {
|
||||
cli *rpcapi.GetSubTreeResponseReader
|
||||
probe *tree.GetSubTreeResponseBody
|
||||
cli *rpcapi.GetSubTreeResponseReader
|
||||
}
|
||||
|
||||
// Read reads another list of the subtree nodes.
|
||||
func (x *SubTreeReader) Read(buf []*tree.GetSubTreeResponseBody) (int, error) {
|
||||
i := 0
|
||||
if x.probe != nil && len(buf) != 0 {
|
||||
buf[0] = x.probe
|
||||
x.probe = nil
|
||||
i = 1
|
||||
}
|
||||
for ; i < len(buf); i++ {
|
||||
for i := range buf {
|
||||
var resp tree.GetSubTreeResponse
|
||||
err := x.cli.Read(&resp)
|
||||
if err == io.EOF {
|
||||
|
@ -465,10 +436,6 @@ func (x *SubTreeReader) Read(buf []*tree.GetSubTreeResponseBody) (int, error) {
|
|||
// ReadAll reads all nodes subtree nodes.
|
||||
func (x *SubTreeReader) ReadAll() ([]*tree.GetSubTreeResponseBody, error) {
|
||||
var res []*tree.GetSubTreeResponseBody
|
||||
if x.probe != nil {
|
||||
res = append(res, x.probe)
|
||||
x.probe = nil
|
||||
}
|
||||
for {
|
||||
var resp tree.GetSubTreeResponse
|
||||
err := x.cli.Read(&resp)
|
||||
|
@ -485,12 +452,6 @@ func (x *SubTreeReader) ReadAll() ([]*tree.GetSubTreeResponseBody, error) {
|
|||
|
||||
// Next gets the next node from subtree.
|
||||
func (x *SubTreeReader) Next() (*tree.GetSubTreeResponseBody, error) {
|
||||
if x.probe != nil {
|
||||
res := x.probe
|
||||
x.probe = nil
|
||||
return res, nil
|
||||
}
|
||||
|
||||
var resp tree.GetSubTreeResponse
|
||||
err := x.cli.Read(&resp)
|
||||
if err == io.EOF {
|
||||
|
@ -534,24 +495,16 @@ func (p *Pool) GetSubTree(ctx context.Context, prm GetSubTreeParams) (*SubTreeRe
|
|||
}
|
||||
|
||||
var cli *rpcapi.GetSubTreeResponseReader
|
||||
var probeBody *tree.GetSubTreeResponseBody
|
||||
err := p.requestWithRetry(ctx, prm.CID, func(client *rpcclient.Client) (inErr error) {
|
||||
cli, inErr = rpcapi.GetSubTree(client, request, rpcclient.WithContext(ctx))
|
||||
if inErr != nil {
|
||||
return handleError("failed to get sub tree client", inErr)
|
||||
}
|
||||
|
||||
probe := &tree.GetSubTreeResponse{}
|
||||
inErr = cli.Read(probe)
|
||||
probeBody = probe.GetBody()
|
||||
return handleError("failed to get first resp from sub tree client", inErr)
|
||||
return handleError("failed to get sub tree client", inErr)
|
||||
})
|
||||
p.methods[methodGetSubTree].IncRequests(time.Since(start))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &SubTreeReader{cli: cli, probe: probeBody}, nil
|
||||
return &SubTreeReader{cli: cli}, nil
|
||||
}
|
||||
|
||||
// AddNode invokes eponymous method from TreeServiceClient.
|
||||
|
@ -811,14 +764,6 @@ func fillDefaultInitParams(params *InitParameters) {
|
|||
if params.maxRequestAttempts <= 0 {
|
||||
params.maxRequestAttempts = len(params.nodeParams)
|
||||
}
|
||||
|
||||
if params.circuitBreakerDuration <= 0 {
|
||||
params.circuitBreakerDuration = defaultCircuitBreakerDuration
|
||||
}
|
||||
|
||||
if params.circuitBreakerThreshold <= 0 {
|
||||
params.circuitBreakerThreshold = defaultCircuitBreakerTreshold
|
||||
}
|
||||
}
|
||||
|
||||
func (p *Pool) log(level zapcore.Level, msg string, fields ...zap.Field) {
|
||||
|
@ -1014,17 +959,14 @@ LOOP:
|
|||
|
||||
treeCl, ok := p.getClientFromMap(cnrNode.Hash())
|
||||
if !ok {
|
||||
err = p.cb.Do(cnrNode.Hash(), func() error {
|
||||
treeCl, err = p.getNewTreeClient(ctx, cnrNode)
|
||||
return err
|
||||
})
|
||||
treeCl, err = p.getNewTreeClient(ctx, cnrNode)
|
||||
if err != nil {
|
||||
finErr = finalError(finErr, err)
|
||||
p.log(zap.DebugLevel, "failed to create tree client", zap.String("request_id", reqID), zap.Int("remaining attempts", attempts))
|
||||
continue
|
||||
}
|
||||
|
||||
treeCl = p.addClientToMap(cnrNode.Hash(), treeCl)
|
||||
p.addClientToMap(cnrNode.Hash(), treeCl)
|
||||
}
|
||||
attempts--
|
||||
|
||||
|
@ -1059,24 +1001,16 @@ func (p *Pool) getClientFromMap(hash uint64) (client, bool) {
|
|||
return cl, ok
|
||||
}
|
||||
|
||||
func (p *Pool) addClientToMap(hash uint64, cl client) client {
|
||||
func (p *Pool) addClientToMap(hash uint64, cl client) {
|
||||
p.mutex.Lock()
|
||||
defer p.mutex.Unlock()
|
||||
|
||||
if old, ok := p.clientMap[hash]; ok {
|
||||
_ = cl.close()
|
||||
return old
|
||||
}
|
||||
p.clientMap[hash] = cl
|
||||
return cl
|
||||
p.mutex.Unlock()
|
||||
}
|
||||
|
||||
func (p *Pool) deleteClientFromMap(hash uint64) {
|
||||
p.mutex.Lock()
|
||||
if cli, ok := p.clientMap[hash]; ok {
|
||||
_ = cli.close()
|
||||
delete(p.clientMap, hash)
|
||||
}
|
||||
_ = p.clientMap[hash].close()
|
||||
delete(p.clientMap, hash)
|
||||
p.mutex.Unlock()
|
||||
}
|
||||
|
||||
|
@ -1096,16 +1030,6 @@ func (p *Pool) getNewTreeClient(ctx context.Context, node netmap.NodeInfo) (*tre
|
|||
newTreeCl := newTreeClient(addr.URIAddr(), p.dialOptions, p.nodeDialTimeout, p.streamTimeout)
|
||||
if err = newTreeCl.dial(ctx); err != nil {
|
||||
p.log(zap.WarnLevel, "failed to dial tree client", zap.String("address", addr.URIAddr()), zap.Error(err))
|
||||
|
||||
// We have to close connection here after failed `dial()`.
|
||||
// This is NOT necessary in object pool and regular tree pool without netmap support, because:
|
||||
// - object pool uses SDK object client which closes connection during `dial()` call by itself,
|
||||
// - regular tree pool is going to reuse connection by calling `redialIfNecessary()`.
|
||||
// Tree pool with netmap support does not operate with background goroutine, so we have to close connection immediately.
|
||||
if err = newTreeCl.close(); err != nil {
|
||||
p.log(zap.WarnLevel, "failed to close recently dialed tree client", zap.String("address", addr.URIAddr()), zap.Error(err))
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
|
||||
|
|
|
@ -1,345 +0,0 @@
|
|||
package tree
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"errors"
|
||||
"io"
|
||||
"net"
|
||||
"runtime"
|
||||
"strconv"
|
||||
"testing"
|
||||
|
||||
apinetmap "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/netmap"
|
||||
apitree "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/tree"
|
||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool"
|
||||
tree "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool/tree/service"
|
||||
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
|
||||
"github.com/stretchr/testify/require"
|
||||
"google.golang.org/grpc"
|
||||
)
|
||||
|
||||
type mockTreeServer struct {
|
||||
id int
|
||||
srv *grpc.Server
|
||||
lis net.Listener
|
||||
key *keys.PrivateKey
|
||||
|
||||
healthy bool
|
||||
addCounter int
|
||||
|
||||
getSubTreeError error
|
||||
getSubTreeResponses []*tree.GetSubTreeResponse_Body
|
||||
getSubTreeCounter int
|
||||
}
|
||||
|
||||
type mockNetmapSource struct {
|
||||
servers []*mockTreeServer
|
||||
policy string
|
||||
}
|
||||
|
||||
func (m *mockNetmapSource) NetMapSnapshot(context.Context) (netmap.NetMap, error) {
|
||||
nm := netmap.NetMap{}
|
||||
nodes := make([]netmap.NodeInfo, len(m.servers))
|
||||
for i, server := range m.servers {
|
||||
ni := apinetmap.NodeInfo{}
|
||||
ni.SetAddresses(server.lis.Addr().String())
|
||||
ni.SetPublicKey(server.key.PublicKey().Bytes())
|
||||
err := nodes[i].ReadFromV2(ni) // no other way to set address field in netmap.NodeInfo
|
||||
if err != nil {
|
||||
return nm, err
|
||||
}
|
||||
nodes[i].SetAttribute("id", strconv.Itoa(server.id))
|
||||
}
|
||||
nm.SetNodes(nodes)
|
||||
return nm, nil
|
||||
}
|
||||
|
||||
func (m *mockNetmapSource) PlacementPolicy(context.Context, cid.ID) (netmap.PlacementPolicy, error) {
|
||||
p := netmap.PlacementPolicy{}
|
||||
return p, p.DecodeString(m.policy)
|
||||
}
|
||||
|
||||
func (m *mockTreeServer) Serve() {
|
||||
go m.srv.Serve(m.lis)
|
||||
}
|
||||
|
||||
func (m *mockTreeServer) Stop() {
|
||||
m.srv.Stop()
|
||||
}
|
||||
|
||||
func (m *mockTreeServer) Addr() string {
|
||||
return m.lis.Addr().String()
|
||||
}
|
||||
|
||||
func (m *mockTreeServer) Add(context.Context, *tree.AddRequest) (*tree.AddResponse, error) {
|
||||
m.addCounter++
|
||||
return &tree.AddResponse{}, nil
|
||||
}
|
||||
|
||||
func (m *mockTreeServer) AddByPath(context.Context, *tree.AddByPathRequest) (*tree.AddByPathResponse, error) {
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func (m *mockTreeServer) Remove(context.Context, *tree.RemoveRequest) (*tree.RemoveResponse, error) {
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func (m *mockTreeServer) Move(context.Context, *tree.MoveRequest) (*tree.MoveResponse, error) {
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func (m *mockTreeServer) GetNodeByPath(context.Context, *tree.GetNodeByPathRequest) (*tree.GetNodeByPathResponse, error) {
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func (m *mockTreeServer) GetSubTree(_ *tree.GetSubTreeRequest, s tree.TreeService_GetSubTreeServer) error {
|
||||
m.getSubTreeCounter++
|
||||
|
||||
if m.getSubTreeError != nil {
|
||||
return m.getSubTreeError
|
||||
}
|
||||
|
||||
for i := range m.getSubTreeResponses {
|
||||
if err := s.Send(&tree.GetSubTreeResponse{
|
||||
Body: m.getSubTreeResponses[i],
|
||||
}); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *mockTreeServer) TreeList(context.Context, *tree.TreeListRequest) (*tree.TreeListResponse, error) {
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func (m *mockTreeServer) Apply(context.Context, *tree.ApplyRequest) (*tree.ApplyResponse, error) {
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func (m *mockTreeServer) GetOpLog(*tree.GetOpLogRequest, tree.TreeService_GetOpLogServer) error {
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func (m *mockTreeServer) Healthcheck(context.Context, *tree.HealthcheckRequest) (*tree.HealthcheckResponse, error) {
|
||||
if m.healthy {
|
||||
return new(tree.HealthcheckResponse), nil
|
||||
}
|
||||
return nil, errors.New("not healthy")
|
||||
}
|
||||
|
||||
func createTestServer(t *testing.T, id int) *mockTreeServer {
|
||||
lis, err := net.Listen("tcp", "127.0.0.1:0")
|
||||
require.NoError(t, err)
|
||||
|
||||
key, err := keys.NewPrivateKey()
|
||||
require.NoError(t, err)
|
||||
|
||||
res := &mockTreeServer{
|
||||
id: id,
|
||||
srv: grpc.NewServer(),
|
||||
lis: lis,
|
||||
key: key,
|
||||
healthy: true,
|
||||
}
|
||||
|
||||
tree.RegisterTreeServiceServer(res.srv, res)
|
||||
|
||||
return res
|
||||
}
|
||||
|
||||
func preparePoolWithNetmapSource(t *testing.T, n int, p string) (*Pool, []*mockTreeServer, *mockNetmapSource) {
|
||||
poolInitParams := InitParameters{}
|
||||
|
||||
servers := make([]*mockTreeServer, n)
|
||||
for i := range servers {
|
||||
servers[i] = createTestServer(t, i)
|
||||
servers[i].healthy = true
|
||||
servers[i].Serve()
|
||||
poolInitParams.AddNode(pool.NewNodeParam(1, servers[i].Addr(), 1))
|
||||
}
|
||||
|
||||
source := &mockNetmapSource{
|
||||
servers: servers,
|
||||
policy: p,
|
||||
}
|
||||
|
||||
key, err := keys.NewPrivateKey()
|
||||
require.NoError(t, err)
|
||||
poolInitParams.SetKey(key)
|
||||
poolInitParams.SetNetMapInfoSource(source)
|
||||
|
||||
cli, err := NewPool(poolInitParams)
|
||||
require.NoError(t, err)
|
||||
|
||||
return cli, servers, source
|
||||
}
|
||||
|
||||
func sortServers(ctx context.Context, servers []*mockTreeServer, source *mockNetmapSource, cnr cid.ID) ([]*mockTreeServer, error) {
|
||||
res := make([]*mockTreeServer, len(servers))
|
||||
snapshot, err := source.NetMapSnapshot(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
policy, err := source.PlacementPolicy(ctx, cnr)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
cnrNodes, err := snapshot.ContainerNodes(policy, cnr[:])
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
priorityNodes, err := snapshot.PlacementVectors(cnrNodes, cnr[:])
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// find servers based on public key and store pointers in res
|
||||
index := 0
|
||||
for i := range priorityNodes {
|
||||
for j := range priorityNodes[i] {
|
||||
key := priorityNodes[i][j].PublicKey()
|
||||
for k := range servers {
|
||||
if bytes.Equal(servers[k].key.PublicKey().Bytes(), key) {
|
||||
res[index] = servers[k]
|
||||
index++
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return res, nil
|
||||
}
|
||||
|
||||
func TestConnectionLeak(t *testing.T) {
|
||||
const (
|
||||
numberOfNodes = 4
|
||||
placementPolicy = "REP 2"
|
||||
)
|
||||
|
||||
// Initialize gRPC servers and create pool with netmap source
|
||||
treePool, servers, source := preparePoolWithNetmapSource(t, numberOfNodes, placementPolicy)
|
||||
for i := range servers {
|
||||
defer servers[i].Stop()
|
||||
}
|
||||
|
||||
cnr := cidtest.ID()
|
||||
ctx := context.Background()
|
||||
|
||||
// Make priority node for cnr unhealthy, so it is going to be redialled on every request
|
||||
sortedServers, err := sortServers(ctx, servers, source, cnr)
|
||||
require.NoError(t, err)
|
||||
sortedServers[0].healthy = false
|
||||
|
||||
// Make RPC and check that pool switched to healthy server
|
||||
_, err = treePool.AddNode(context.Background(), AddNodeParams{CID: cnr})
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, 0, sortedServers[0].addCounter) // unhealthy
|
||||
require.Equal(t, 1, sortedServers[1].addCounter) // healthy
|
||||
|
||||
// Check that go routines are not leaked during multiple requests
|
||||
routinesBefore := runtime.NumGoroutine()
|
||||
for i := 0; i < 1000; i++ {
|
||||
_, err = treePool.AddNode(context.Background(), AddNodeParams{CID: cnr})
|
||||
require.NoError(t, err)
|
||||
}
|
||||
// not more than 1 extra goroutine is created due to async operations
|
||||
require.LessOrEqual(t, runtime.NumGoroutine()-routinesBefore, 1)
|
||||
}
|
||||
|
||||
func TestStreamRetry(t *testing.T) {
|
||||
const (
|
||||
numberOfNodes = 4
|
||||
placementPolicy = "REP 2"
|
||||
)
|
||||
|
||||
expected := []*tree.GetSubTreeResponse_Body{
|
||||
{
|
||||
NodeId: []uint64{1},
|
||||
},
|
||||
{
|
||||
NodeId: []uint64{2},
|
||||
},
|
||||
{
|
||||
NodeId: []uint64{3},
|
||||
},
|
||||
}
|
||||
|
||||
// Initialize gRPC servers and create pool with netmap source
|
||||
treePool, servers, source := preparePoolWithNetmapSource(t, numberOfNodes, placementPolicy)
|
||||
defer func() {
|
||||
for i := range servers {
|
||||
servers[i].Stop()
|
||||
}
|
||||
}()
|
||||
|
||||
cnr := cidtest.ID()
|
||||
ctx := context.Background()
|
||||
|
||||
sortedServers, err := sortServers(ctx, servers, source, cnr)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Return expected response in last priority node, others return error
|
||||
for i := range sortedServers {
|
||||
if i == len(sortedServers)-1 {
|
||||
sortedServers[i].getSubTreeResponses = expected
|
||||
} else {
|
||||
sortedServers[i].getSubTreeError = errors.New("tree not found")
|
||||
}
|
||||
}
|
||||
|
||||
t.Run("read all", func(t *testing.T) {
|
||||
reader, err := treePool.GetSubTree(ctx, GetSubTreeParams{CID: cnr})
|
||||
require.NoError(t, err)
|
||||
|
||||
data, err := reader.ReadAll()
|
||||
require.NoError(t, err)
|
||||
|
||||
require.Len(t, data, len(expected))
|
||||
for i := range expected {
|
||||
require.EqualValues(t, expected[i].GetNodeId(), data[i].GetNodeID())
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("next", func(t *testing.T) {
|
||||
reader, err := treePool.GetSubTree(ctx, GetSubTreeParams{CID: cnr})
|
||||
require.NoError(t, err)
|
||||
|
||||
for i := range expected {
|
||||
resp, err := reader.Next()
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, expected[i].GetNodeId(), resp.GetNodeID())
|
||||
}
|
||||
|
||||
_, err = reader.Next()
|
||||
require.Error(t, io.EOF, err)
|
||||
})
|
||||
|
||||
t.Run("read", func(t *testing.T) {
|
||||
reader, err := treePool.GetSubTree(ctx, GetSubTreeParams{CID: cnr})
|
||||
require.NoError(t, err)
|
||||
|
||||
buf := make([]*apitree.GetSubTreeResponseBody, len(expected))
|
||||
_, err = reader.Read(buf)
|
||||
require.NoError(t, err)
|
||||
|
||||
require.Len(t, buf, len(expected))
|
||||
for i := range expected {
|
||||
require.EqualValues(t, expected[i].GetNodeId(), buf[i].GetNodeID())
|
||||
}
|
||||
})
|
||||
|
||||
for i := range servers {
|
||||
// check we retried every available node in the pool three times
|
||||
require.Equal(t, 3, servers[i].getSubTreeCounter)
|
||||
}
|
||||
}
|
|
@ -10,7 +10,6 @@ import (
|
|||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
|
||||
netmaptest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap/test"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool"
|
||||
"git.frostfs.info/TrueCloudLab/hrw"
|
||||
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
|
||||
|
@ -417,21 +416,6 @@ func TestRetryContainerNodes(t *testing.T) {
|
|||
})
|
||||
}
|
||||
|
||||
func TestDeleteClientTwice(t *testing.T) {
|
||||
p := Pool{
|
||||
clientMap: makeClientMap([]netmap.NodeInfo{netmaptest.NodeInfo()}),
|
||||
}
|
||||
// emulate concurrent requests as consecutive requests
|
||||
// to delete the same client from the map twice
|
||||
for idToDelete := range p.clientMap {
|
||||
p.deleteClientFromMap(idToDelete)
|
||||
require.NotPanics(t, func() {
|
||||
p.deleteClientFromMap(idToDelete)
|
||||
})
|
||||
}
|
||||
require.Empty(t, p.clientMap)
|
||||
}
|
||||
|
||||
func makeInnerPool(nodes [][]string) []*innerPool {
|
||||
res := make([]*innerPool, len(nodes))
|
||||
|
||||
|
|
84
user/id.go
84
user/id.go
|
@ -4,7 +4,6 @@ import (
|
|||
"bytes"
|
||||
"errors"
|
||||
"fmt"
|
||||
"strings"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/refs"
|
||||
"github.com/mr-tron/base58"
|
||||
|
@ -13,8 +12,9 @@ import (
|
|||
"github.com/nspcc-dev/neo-go/pkg/util"
|
||||
)
|
||||
|
||||
// idFullSize is the size of ID in bytes, including prefix and checksum.
|
||||
const idFullSize = util.Uint160Size + 5
|
||||
const idSize = 25
|
||||
|
||||
var zeroSlice = bytes.Repeat([]byte{0}, idSize)
|
||||
|
||||
// ID identifies users of the FrostFS system.
|
||||
//
|
||||
|
@ -25,7 +25,7 @@ const idFullSize = util.Uint160Size + 5
|
|||
// so it MUST be initialized using some modifying function (e.g. SetScriptHash,
|
||||
// IDFromKey, etc.).
|
||||
type ID struct {
|
||||
w util.Uint160
|
||||
w []byte
|
||||
}
|
||||
|
||||
// ReadFromV2 reads ID from the refs.OwnerID message. Returns an error if
|
||||
|
@ -33,7 +33,22 @@ type ID struct {
|
|||
//
|
||||
// See also WriteToV2.
|
||||
func (x *ID) ReadFromV2(m refs.OwnerID) error {
|
||||
return x.setUserID(m.GetValue())
|
||||
w := m.GetValue()
|
||||
if len(w) != idSize {
|
||||
return fmt.Errorf("invalid length %d, expected %d", len(w), idSize)
|
||||
}
|
||||
|
||||
if w[0] != address.NEO3Prefix {
|
||||
return fmt.Errorf("invalid prefix byte 0x%X, expected 0x%X", w[0], address.NEO3Prefix)
|
||||
}
|
||||
|
||||
if !bytes.Equal(w[21:], hash.Checksum(w[:21])) {
|
||||
return errors.New("checksum mismatch")
|
||||
}
|
||||
|
||||
x.w = w
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// WriteToV2 writes ID to the refs.OwnerID message.
|
||||
|
@ -41,17 +56,25 @@ func (x *ID) ReadFromV2(m refs.OwnerID) error {
|
|||
//
|
||||
// See also ReadFromV2.
|
||||
func (x ID) WriteToV2(m *refs.OwnerID) {
|
||||
m.SetValue(x.WalletBytes())
|
||||
m.SetValue(x.w)
|
||||
}
|
||||
|
||||
// SetScriptHash forms user ID from wallet address scripthash.
|
||||
func (x *ID) SetScriptHash(scriptHash util.Uint160) {
|
||||
x.w = scriptHash
|
||||
if cap(x.w) < idSize {
|
||||
x.w = make([]byte, idSize)
|
||||
} else if len(x.w) < idSize {
|
||||
x.w = x.w[:idSize]
|
||||
}
|
||||
|
||||
x.w[0] = address.Prefix
|
||||
copy(x.w[1:], scriptHash.BytesBE())
|
||||
copy(x.w[21:], hash.Checksum(x.w[:21]))
|
||||
}
|
||||
|
||||
// ScriptHash calculates and returns script hash of ID.
|
||||
func (x *ID) ScriptHash() util.Uint160 {
|
||||
return x.w
|
||||
func (x *ID) ScriptHash() (util.Uint160, error) {
|
||||
return util.Uint160DecodeBytesBE(x.w[1:21])
|
||||
}
|
||||
|
||||
// WalletBytes returns FrostFS user ID as Neo3 wallet address in a binary format.
|
||||
|
@ -60,18 +83,14 @@ func (x *ID) ScriptHash() util.Uint160 {
|
|||
//
|
||||
// See also Neo3 wallet docs.
|
||||
func (x ID) WalletBytes() []byte {
|
||||
v := make([]byte, idFullSize)
|
||||
v[0] = address.Prefix
|
||||
copy(v[1:], x.w[:])
|
||||
copy(v[21:], hash.Checksum(v[:21]))
|
||||
return v
|
||||
return x.w
|
||||
}
|
||||
|
||||
// EncodeToString encodes ID into FrostFS API V2 protocol string.
|
||||
//
|
||||
// See also DecodeString.
|
||||
func (x ID) EncodeToString() string {
|
||||
return base58.Encode(x.WalletBytes())
|
||||
return base58.Encode(x.w)
|
||||
}
|
||||
|
||||
// DecodeString decodes FrostFS API V2 protocol string. Returns an error
|
||||
|
@ -81,11 +100,14 @@ func (x ID) EncodeToString() string {
|
|||
//
|
||||
// See also EncodeToString.
|
||||
func (x *ID) DecodeString(s string) error {
|
||||
w, err := base58.Decode(s)
|
||||
var err error
|
||||
|
||||
x.w, err = base58.Decode(s)
|
||||
if err != nil {
|
||||
return fmt.Errorf("decode base58: %w", err)
|
||||
}
|
||||
return x.setUserID(w)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// String implements fmt.Stringer.
|
||||
|
@ -99,34 +121,10 @@ func (x ID) String() string {
|
|||
|
||||
// Equals defines a comparison relation between two ID instances.
|
||||
func (x ID) Equals(x2 ID) bool {
|
||||
return x.w == x2.w
|
||||
return bytes.Equal(x.w, x2.w)
|
||||
}
|
||||
|
||||
// IsEmpty returns True, if ID is empty value.
|
||||
func (x ID) IsEmpty() bool {
|
||||
return x.w == util.Uint160{}
|
||||
}
|
||||
|
||||
func (x *ID) setUserID(w []byte) error {
|
||||
if len(w) != idFullSize {
|
||||
return fmt.Errorf("invalid length %d, expected %d", len(w), idFullSize)
|
||||
}
|
||||
|
||||
if w[0] != address.NEO3Prefix {
|
||||
return fmt.Errorf("invalid prefix byte 0x%X, expected 0x%X", w[0], address.NEO3Prefix)
|
||||
}
|
||||
|
||||
if !bytes.Equal(w[21:], hash.Checksum(w[:21])) {
|
||||
return errors.New("checksum mismatch")
|
||||
}
|
||||
|
||||
copy(x.w[:], w[1:21])
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Cmp returns an integer comparing two base58 encoded user ID lexicographically.
|
||||
// The result will be 0 if id1 == id2, -1 if id1 < id2, and +1 if id1 > id2.
|
||||
func (x ID) Cmp(x2 ID) int {
|
||||
return strings.Compare(x.EncodeToString(), x2.EncodeToString())
|
||||
return bytes.Equal(zeroSlice, x.w)
|
||||
}
|
||||
|
|
|
@ -3,8 +3,6 @@ package user_test
|
|||
import (
|
||||
"bytes"
|
||||
"crypto/rand"
|
||||
"slices"
|
||||
"strings"
|
||||
"testing"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/refs"
|
||||
|
@ -53,7 +51,8 @@ func TestID_SetScriptHash(t *testing.T) {
|
|||
func TestID_ScriptHash(t *testing.T) {
|
||||
userID := usertest.ID()
|
||||
|
||||
scriptHash := userID.ScriptHash()
|
||||
scriptHash, err := userID.ScriptHash()
|
||||
require.NoError(t, err)
|
||||
|
||||
ownerAddress := userID.EncodeToString()
|
||||
decodedScriptHash, err := address.StringToUint160(ownerAddress)
|
||||
|
@ -134,16 +133,3 @@ func TestID_Equal(t *testing.T) {
|
|||
require.True(t, id3.Equals(id1)) // commutativity
|
||||
require.False(t, id1.Equals(id2))
|
||||
}
|
||||
|
||||
func TestID_Cmp(t *testing.T) {
|
||||
id1 := usertest.ID()
|
||||
id2 := usertest.ID()
|
||||
id3 := usertest.ID()
|
||||
|
||||
arr := []ID{id1, id2, id3}
|
||||
|
||||
slices.SortFunc(arr, ID.Cmp)
|
||||
for i := 1; i < len(arr); i++ {
|
||||
require.NotEqual(t, strings.Compare(arr[i-1].EncodeToString(), arr[i].EncodeToString()), 1, "array is not sorted correctly")
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Add table
Reference in a new issue