Compare commits

...
Sign in to create a new pull request.

36 commits

Author SHA1 Message Date
a262a0038f [#343] pool: Fix Yoda condition
go-staticcheck recommends not to use Yoda conditions.

Signed-off-by: Ekaterina Lebedeva <ekaterina.lebedeva@yadro.com>
2025-03-10 19:24:58 +03:00
fe5b28e6bf [#338] pool: Support avg request time for ListContainerStream
Signed-off-by: Ekaterina Lebedeva <ekaterina.lebedeva@yadro.com>
2025-03-10 19:24:58 +03:00
749b4e9ab5 [#344] netmap: Add method Clone
Signed-off-by: Anton Nikiforov <an.nikiforov@yadro.com>
2025-03-07 18:02:02 +03:00
f70c0c9081 [#300] pool: Remove obvious comments
Signed-off-by: Alexander Chuprov <a.chuprov@yadro.com>
2025-03-07 11:45:31 +00:00
17177697b5 [#300] pool/cm: Remove unused mutex in 'statistics'
Signed-off-by: Alexander Chuprov <a.chuprov@yadro.com>
2025-03-07 11:45:31 +00:00
56046935b0 [#300] pool: Move 'clientWrapper' to separate file
Signed-off-by: Alexander Chuprov <a.chuprov@yadro.com>
2025-03-07 11:45:31 +00:00
cee7f9de47 [#300] pool: Move 'connectionManager' to separate file
Signed-off-by: Alexander Chuprov <a.chuprov@yadro.com>
2025-03-07 11:45:31 +00:00
d77a8742bc [#300] pool: Move 'healthCheck' to separate file
Signed-off-by: Alexander Chuprov <a.chuprov@yadro.com>
2025-03-07 11:45:31 +00:00
b083bf8546 [#300] pool: Extract healthCheck functionality from 'connectionManager' to 'healthCheck'
Signed-off-by: Alexander Chuprov <a.chuprov@yadro.com>
2025-03-07 11:45:31 +00:00
b480df99ca [#300] pool: Extract connection handler functionality to 'connectionManager'
Signed-off-by: Alexander Chuprov <a.chuprov@yadro.com>
2025-03-07 11:45:31 +00:00
69b0711d12 [#342] client: Disable service config query
By default, gRPC fetches TXT report while resolving a domain.
0914bba6c5/internal/resolver/dns/dns_resolver.go (L336)

This leads to a hanging dial if DNS is unavailable, even though the host
may be specified in `/etc/hosts` (hello, localhost!).

Use `grpc.WithDisableServiceConfig()` to override the default.
This option seems impossible to override with `WithGRPCDialOpts()`,
but we do not use service config anyway.

Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2025-03-06 09:24:16 +00:00
7a37613988 [#339] pool/tree: Improve code after review
Signed-off-by: Alex Vanin <a.vanin@yadro.com>
2025-03-05 14:40:45 +03:00
d592bb931e [#339] pool/tree: Configure circuit breaker parameters
Signed-off-by: Alex Vanin <a.vanin@yadro.com>
2025-03-04 14:49:07 +03:00
06ef257ddc [#339] pool/tree: Do not lock mutex on circuit break function
Circuit break function may take some time to execute so it should
not be executed when lock is enabled.

Signed-off-by: Alex Vanin <a.vanin@yadro.com>
2025-03-04 14:41:01 +03:00
2d08fa5240 [#339] pool/tree: Close replaced connection in client map
There is a race condition: multiple clients are created and dialled,
but only one is stored in the map. Others are remaining active but not
used.

With this change, new connection replaces old connection and closes
it.

Signed-off-by: Alex Vanin <a.vanin@yadro.com>
2025-03-04 14:41:01 +03:00
c5991fc66d [#339] pool/tree: Fix linter issues
Signed-off-by: Alex Vanin <a.vanin@yadro.com>
2025-03-04 12:22:38 +03:00
f78fb6dcb0 [#339] pool/tree: Make circuit breaker more generic
Signed-off-by: Alex Vanin <a.vanin@yadro.com>
2025-03-04 12:21:11 +03:00
c8d71c450a [#339] pool/tree: Add circuit breaker
Signed-off-by: Alex Vanin <a.vanin@yadro.com>
2025-03-04 12:20:58 +03:00
2b8329e026 [#336] pool/tree: Increase test coverage in TestStreamRetry
Signed-off-by: Alex Vanin <a.vanin@yadro.com>
2025-02-28 12:32:56 +03:00
ada0513504 [#336] pool/tree: Do probe in getSubTree to handle error in advance
Signed-off-by: Denis Kirillov <d.kirillov@yadro.com>
2025-02-28 10:46:01 +03:00
c3f7378887 [#294] user: Implement Cmp() function for ID struct
Signed-off-by: Alexander Chuprov <a.chuprov@yadro.com>
2025-02-17 15:22:55 +00:00
bcb5fd22d4 [#294] container: Implement Cmp() function for ID struct
Signed-off-by: Alexander Chuprov <a.chuprov@yadro.com>
2025-02-17 15:22:55 +00:00
148a341b6f [#294] object: Implement Cmp() function for ID struct
Signed-off-by: Alexander Chuprov <a.chuprov@yadro.com>
2025-02-17 15:22:55 +00:00
56892e48ac
[#334] rpc: Use DeadlineExceeded error when creating stream failed by timeout
According to https://pkg.go.dev/context#pkg-variables, ContextCancelled
should be returned when the context is canceled for some reason other
than its deadline passing. So creating gRPC stream with dial timeout fits
better for context.DeadlineExceeded.

Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
2025-02-17 16:06:11 +03:00
7bdc78f2b5
[#332] client/status: Support RESOURCE_EXHAUSTED status
Signed-off-by: Aleksey Savchuk <a.savchuk@yadro.com>
2025-02-06 11:38:57 +03:00
cf0bbd03ae
[#332] api/status: Regenerate common statuses
Introduced a new common status `RESOURCE_EXHAUSTED`.

Signed-off-by: Aleksey Savchuk <a.savchuk@yadro.com>
2025-02-06 11:38:45 +03:00
5a35fa4353 [#331] pool: Fix 'sortServers' in tree pool server test
Signed-off-by: Alex Vanin <a.vanin@yadro.com>
2025-02-05 10:44:03 +03:00
4ecbfb0edf [#331] pool: Add mocked test tree service and check goroutine leak
Use real gRPC connection in new mocked tree service.

Signed-off-by: Alex Vanin <a.vanin@yadro.com>
2025-02-04 21:15:15 +03:00
f08a7f0b3c [#331] pool: Avoid connection leak in tree pool with netmap support
To avoid connection leak, call `close()` immediately after connection
is established. In regular tree pool, unhealthy connections are handled
by background goroutine which calls `redialIfNecessary()` to reestablish
connection. Here it is not viable so connection must be close.

Signed-off-by: Alex Vanin <a.vanin@yadro.com>
2025-02-04 21:15:10 +03:00
8389887a34
[#319] object/transformer: Add expiration epoch to each part
Signed-off-by: Aleksey Savchuk <a.savchuk@yadro.com>
2025-02-02 18:14:21 +03:00
593dd77d84
[#327] rpc: Fix mem leak
gRPC stream must be closed by `cancel` to prevent memleak.

Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
2025-01-30 12:53:43 +03:00
2786fadb25 [#326] pool: Add test for concurrent client deletion
Signed-off-by: Alex Vanin <a.vanin@yadro.com>
2025-01-29 20:16:59 +03:00
37350dbb1e [#326] pool: Fix panic that causes mutex deadlock
Two concurrent 'deleteClientFromMap' calls for
the same client may produce panic and deadlock.

First goroutine acquires lock, removes element
from the map, releases lock.

Second goroutine acquires lock, and throws panic
while trying to call 'close()' on empty struct.
Lock is never released and it causes deadlock for
other goroutines.

Signed-off-by: Alex Vanin <a.vanin@yadro.com>
2025-01-29 20:14:55 +03:00
d195cb5104
[#324] rpc: Fix mem leak
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
2025-01-29 16:34:30 +03:00
00cebd297f [#303] user: Make user.ID a util.Uint160
Signed-off-by: Ekaterina Lebedeva <ekaterina.lebedeva@yadro.com>
2025-01-28 18:01:21 +03:00
761d087b93 [#302] user: Make ScriptHash return no error
Signed-off-by: Ekaterina Lebedeva <ekaterina.lebedeva@yadro.com>
2025-01-27 19:13:52 +03:00
35 changed files with 2867 additions and 1813 deletions

View file

@ -1,6 +1,9 @@
package netmap
import (
"bytes"
"slices"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/refs"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/session"
)
@ -382,6 +385,18 @@ func (a *Attribute) SetParents(parent []string) {
a.parents = parent
}
// Clone returns a copy of Attribute.
func (a *Attribute) Clone() *Attribute {
if a == nil {
return nil
}
return &Attribute{
parents: slices.Clone(a.parents),
value: a.value,
key: a.key,
}
}
func (ni *NodeInfo) GetPublicKey() []byte {
if ni != nil {
return ni.publicKey
@ -465,6 +480,23 @@ func (ni *NodeInfo) SetState(state NodeState) {
ni.state = state
}
// Clone returns a copy of NodeInfo.
func (ni *NodeInfo) Clone() *NodeInfo {
if ni == nil {
return nil
}
dst := NodeInfo{
addresses: slices.Clone(ni.addresses),
publicKey: bytes.Clone(ni.publicKey),
state: ni.state,
attributes: make([]Attribute, len(ni.attributes)),
}
for i, v := range ni.attributes {
dst.attributes[i] = *v.Clone()
}
return &dst
}
func (l *LocalNodeInfoResponseBody) GetVersion() *refs.Version {
if l != nil {
return l.version

48
api/netmap/types_test.go Normal file
View file

@ -0,0 +1,48 @@
package netmap
import (
"bytes"
"slices"
"testing"
"github.com/stretchr/testify/require"
)
func TestNodeInfo_Clone(t *testing.T) {
var ni NodeInfo
ni.publicKey = []byte{2}
attr := Attribute{
key: "key",
value: "value",
parents: []string{"parent", "parent2"},
}
ni.attributes = []Attribute{attr}
ni.addresses = []string{"5", "6"}
c := ni.Clone()
require.True(t, c != &ni)
require.True(t, bytes.Equal(c.publicKey, ni.publicKey))
require.True(t, &(c.publicKey[0]) != &(ni.publicKey[0]))
require.True(t, &(c.attributes[0]) != &(ni.attributes[0]))
require.True(t, slices.Compare(c.addresses, ni.addresses) == 0)
require.True(t, &(c.addresses[0]) != &(ni.addresses[0]))
}
func TestAttribute_Clone(t *testing.T) {
attr := Attribute{
key: "key",
value: "value",
parents: []string{"parent1", "parent2"},
}
c := attr.Clone()
require.True(t, c != &attr)
require.True(t, c.key == attr.key)
require.True(t, &(c.key) != &(attr.key))
require.True(t, &(c.value) != &(attr.value))
require.True(t, c.value == attr.value)
require.True(t, &(c.parents[0]) != &(attr.parents[0]))
require.True(t, slices.Compare(c.parents, attr.parents) == 0)
}

View file

@ -12,18 +12,20 @@ import (
// SendUnary initializes communication session by RPC info, performs unary RPC
// and closes the session.
func SendUnary(cli *Client, info common.CallMethodInfo, req, resp message.Message, opts ...CallOption) error {
rw, err := cli.Init(info, opts...)
rw, err := cli.initInternal(info, opts...)
if err != nil {
return err
}
err = rw.WriteMessage(req)
if err != nil {
rw.cancel()
return err
}
err = rw.ReadMessage(resp)
if err != nil {
rw.cancel()
return err
}
@ -38,18 +40,28 @@ type MessageWriterCloser interface {
}
type clientStreamWriterCloser struct {
MessageReadWriter
sw *streamWrapper
resp message.Message
}
// WriteMessage implements MessageWriterCloser.
func (c *clientStreamWriterCloser) WriteMessage(m message.Message) error {
return c.sw.WriteMessage(m)
}
func (c *clientStreamWriterCloser) Close() error {
err := c.MessageReadWriter.Close()
err := c.sw.closeSend()
if err != nil {
c.sw.cancel()
return err
}
return c.ReadMessage(c.resp)
if err = c.sw.ReadMessage(c.resp); err != nil {
c.sw.cancel()
return err
}
return c.sw.Close()
}
// OpenClientStream initializes communication session by RPC info, opens client-side stream
@ -57,14 +69,14 @@ func (c *clientStreamWriterCloser) Close() error {
//
// All stream writes must be performed before the closing. Close must be called once.
func OpenClientStream(cli *Client, info common.CallMethodInfo, resp message.Message, opts ...CallOption) (MessageWriterCloser, error) {
rw, err := cli.Init(info, opts...)
rw, err := cli.initInternal(info, opts...)
if err != nil {
return nil, err
}
return &clientStreamWriterCloser{
MessageReadWriter: rw,
resp: resp,
sw: rw,
resp: resp,
}, nil
}
@ -76,7 +88,7 @@ type MessageReaderCloser interface {
}
type serverStreamReaderCloser struct {
rw MessageReadWriter
rw *streamWrapper
once sync.Once
@ -91,11 +103,15 @@ func (s *serverStreamReaderCloser) ReadMessage(msg message.Message) error {
})
if err != nil {
s.rw.cancel()
return err
}
err = s.rw.ReadMessage(msg)
if !errors.Is(err, io.EOF) {
if err != nil {
s.rw.cancel()
}
return err
}
@ -112,7 +128,7 @@ func (s *serverStreamReaderCloser) ReadMessage(msg message.Message) error {
//
// All stream reads must be performed before the closing. Close must be called once.
func OpenServerStream(cli *Client, info common.CallMethodInfo, req message.Message, opts ...CallOption) (MessageReader, error) {
rw, err := cli.Init(info, opts...)
rw, err := cli.initInternal(info, opts...)
if err != nil {
return nil, err
}

View file

@ -41,6 +41,10 @@ type MessageReadWriter interface {
// Init initiates a messaging session and returns the interface for message transmitting.
func (c *Client) Init(info common.CallMethodInfo, opts ...CallOption) (MessageReadWriter, error) {
return c.initInternal(info, opts...)
}
func (c *Client) initInternal(info common.CallMethodInfo, opts ...CallOption) (*streamWrapper, error) {
prm := defaultCallParameters()
for _, opt := range opts {
@ -61,7 +65,13 @@ func (c *Client) Init(info common.CallMethodInfo, opts ...CallOption) (MessageRe
// would propagate to all subsequent read/write operations on the opened stream,
// which is not desired for the stream's lifecycle management.
dialTimeoutTimer := time.NewTimer(c.dialTimeout)
defer dialTimeoutTimer.Stop()
defer func() {
dialTimeoutTimer.Stop()
select {
case <-dialTimeoutTimer.C:
default:
}
}()
type newStreamRes struct {
stream grpc.ClientStream
@ -91,7 +101,7 @@ func (c *Client) Init(info common.CallMethodInfo, opts ...CallOption) (MessageRe
if res.stream != nil && res.err == nil {
_ = res.stream.CloseSend()
}
return nil, context.Canceled
return nil, context.DeadlineExceeded
case res = <-newStreamCh:
}

View file

@ -38,6 +38,7 @@ func (c *cfg) initDefault() {
c.rwTimeout = defaultRWTimeout
c.grpcDialOpts = []grpc.DialOption{
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithDisableServiceConfig(),
}
}

View file

@ -34,10 +34,15 @@ func (w streamWrapper) WriteMessage(m message.Message) error {
})
}
func (w *streamWrapper) Close() error {
func (w *streamWrapper) closeSend() error {
return w.withTimeout(w.ClientStream.CloseSend)
}
func (w *streamWrapper) Close() error {
w.cancel()
return nil
}
func (w *streamWrapper) withTimeout(closure func() error) error {
ch := make(chan error, 1)
go func() {
@ -50,6 +55,10 @@ func (w *streamWrapper) withTimeout(closure func() error) error {
select {
case err := <-ch:
tt.Stop()
select {
case <-tt.C:
default:
}
return err
case <-tt.C:
w.cancel()

View file

@ -144,6 +144,9 @@ const (
// request parameter as the client sent it incorrectly, then this code should
// be used.
CommonFail_INVALID_ARGUMENT CommonFail = 4
// [**1029**] Resource exhausted failure. If the operation cannot be performed
// due to a lack of resources.
CommonFail_RESOURCE_EXHAUSTED CommonFail = 5
)
// Enum value maps for CommonFail.
@ -154,6 +157,7 @@ var (
2: "SIGNATURE_VERIFICATION_FAIL",
3: "NODE_UNDER_MAINTENANCE",
4: "INVALID_ARGUMENT",
5: "RESOURCE_EXHAUSTED",
}
CommonFail_value = map[string]int32{
"INTERNAL": 0,
@ -161,6 +165,7 @@ var (
"SIGNATURE_VERIFICATION_FAIL": 2,
"NODE_UNDER_MAINTENANCE": 3,
"INVALID_ARGUMENT": 4,
"RESOURCE_EXHAUSTED": 5,
}
)
@ -654,7 +659,7 @@ var file_api_status_grpc_types_proto_rawDesc = []byte{
0x45, 0x43, 0x54, 0x49, 0x4f, 0x4e, 0x5f, 0x53, 0x45, 0x53, 0x53, 0x49, 0x4f, 0x4e, 0x10, 0x04,
0x12, 0x17, 0x0a, 0x13, 0x53, 0x45, 0x43, 0x54, 0x49, 0x4f, 0x4e, 0x5f, 0x41, 0x50, 0x45, 0x5f,
0x4d, 0x41, 0x4e, 0x41, 0x47, 0x45, 0x52, 0x10, 0x05, 0x2a, 0x11, 0x0a, 0x07, 0x53, 0x75, 0x63,
0x63, 0x65, 0x73, 0x73, 0x12, 0x06, 0x0a, 0x02, 0x4f, 0x4b, 0x10, 0x00, 0x2a, 0x85, 0x01, 0x0a,
0x63, 0x65, 0x73, 0x73, 0x12, 0x06, 0x0a, 0x02, 0x4f, 0x4b, 0x10, 0x00, 0x2a, 0x9d, 0x01, 0x0a,
0x0a, 0x43, 0x6f, 0x6d, 0x6d, 0x6f, 0x6e, 0x46, 0x61, 0x69, 0x6c, 0x12, 0x0c, 0x0a, 0x08, 0x49,
0x4e, 0x54, 0x45, 0x52, 0x4e, 0x41, 0x4c, 0x10, 0x00, 0x12, 0x16, 0x0a, 0x12, 0x57, 0x52, 0x4f,
0x4e, 0x47, 0x5f, 0x4d, 0x41, 0x47, 0x49, 0x43, 0x5f, 0x4e, 0x55, 0x4d, 0x42, 0x45, 0x52, 0x10,
@ -663,34 +668,35 @@ var file_api_status_grpc_types_proto_rawDesc = []byte{
0x10, 0x02, 0x12, 0x1a, 0x0a, 0x16, 0x4e, 0x4f, 0x44, 0x45, 0x5f, 0x55, 0x4e, 0x44, 0x45, 0x52,
0x5f, 0x4d, 0x41, 0x49, 0x4e, 0x54, 0x45, 0x4e, 0x41, 0x4e, 0x43, 0x45, 0x10, 0x03, 0x12, 0x14,
0x0a, 0x10, 0x49, 0x4e, 0x56, 0x41, 0x4c, 0x49, 0x44, 0x5f, 0x41, 0x52, 0x47, 0x55, 0x4d, 0x45,
0x4e, 0x54, 0x10, 0x04, 0x2a, 0x88, 0x01, 0x0a, 0x06, 0x4f, 0x62, 0x6a, 0x65, 0x63, 0x74, 0x12,
0x11, 0x0a, 0x0d, 0x41, 0x43, 0x43, 0x45, 0x53, 0x53, 0x5f, 0x44, 0x45, 0x4e, 0x49, 0x45, 0x44,
0x10, 0x00, 0x12, 0x14, 0x0a, 0x10, 0x4f, 0x42, 0x4a, 0x45, 0x43, 0x54, 0x5f, 0x4e, 0x4f, 0x54,
0x5f, 0x46, 0x4f, 0x55, 0x4e, 0x44, 0x10, 0x01, 0x12, 0x0a, 0x0a, 0x06, 0x4c, 0x4f, 0x43, 0x4b,
0x45, 0x44, 0x10, 0x02, 0x12, 0x1b, 0x0a, 0x17, 0x4c, 0x4f, 0x43, 0x4b, 0x5f, 0x4e, 0x4f, 0x4e,
0x5f, 0x52, 0x45, 0x47, 0x55, 0x4c, 0x41, 0x52, 0x5f, 0x4f, 0x42, 0x4a, 0x45, 0x43, 0x54, 0x10,
0x03, 0x12, 0x1a, 0x0a, 0x16, 0x4f, 0x42, 0x4a, 0x45, 0x43, 0x54, 0x5f, 0x41, 0x4c, 0x52, 0x45,
0x41, 0x44, 0x59, 0x5f, 0x52, 0x45, 0x4d, 0x4f, 0x56, 0x45, 0x44, 0x10, 0x04, 0x12, 0x10, 0x0a,
0x0c, 0x4f, 0x55, 0x54, 0x5f, 0x4f, 0x46, 0x5f, 0x52, 0x41, 0x4e, 0x47, 0x45, 0x10, 0x05, 0x2a,
0x55, 0x0a, 0x09, 0x43, 0x6f, 0x6e, 0x74, 0x61, 0x69, 0x6e, 0x65, 0x72, 0x12, 0x17, 0x0a, 0x13,
0x43, 0x4f, 0x4e, 0x54, 0x41, 0x49, 0x4e, 0x45, 0x52, 0x5f, 0x4e, 0x4f, 0x54, 0x5f, 0x46, 0x4f,
0x55, 0x4e, 0x44, 0x10, 0x00, 0x12, 0x12, 0x0a, 0x0e, 0x45, 0x41, 0x43, 0x4c, 0x5f, 0x4e, 0x4f,
0x54, 0x5f, 0x46, 0x4f, 0x55, 0x4e, 0x44, 0x10, 0x01, 0x12, 0x1b, 0x0a, 0x17, 0x43, 0x4f, 0x4e,
0x54, 0x41, 0x49, 0x4e, 0x45, 0x52, 0x5f, 0x41, 0x43, 0x43, 0x45, 0x53, 0x53, 0x5f, 0x44, 0x45,
0x4e, 0x49, 0x45, 0x44, 0x10, 0x02, 0x2a, 0x31, 0x0a, 0x07, 0x53, 0x65, 0x73, 0x73, 0x69, 0x6f,
0x6e, 0x12, 0x13, 0x0a, 0x0f, 0x54, 0x4f, 0x4b, 0x45, 0x4e, 0x5f, 0x4e, 0x4f, 0x54, 0x5f, 0x46,
0x4f, 0x55, 0x4e, 0x44, 0x10, 0x00, 0x12, 0x11, 0x0a, 0x0d, 0x54, 0x4f, 0x4b, 0x45, 0x4e, 0x5f,
0x45, 0x58, 0x50, 0x49, 0x52, 0x45, 0x44, 0x10, 0x01, 0x2a, 0x2b, 0x0a, 0x0a, 0x41, 0x50, 0x45,
0x4d, 0x61, 0x6e, 0x61, 0x67, 0x65, 0x72, 0x12, 0x1d, 0x0a, 0x19, 0x41, 0x50, 0x45, 0x5f, 0x4d,
0x41, 0x4e, 0x41, 0x47, 0x45, 0x52, 0x5f, 0x41, 0x43, 0x43, 0x45, 0x53, 0x53, 0x5f, 0x44, 0x45,
0x4e, 0x49, 0x45, 0x44, 0x10, 0x00, 0x42, 0x62, 0x5a, 0x43, 0x67, 0x69, 0x74, 0x2e, 0x66, 0x72,
0x6f, 0x73, 0x74, 0x66, 0x73, 0x2e, 0x69, 0x6e, 0x66, 0x6f, 0x2f, 0x54, 0x72, 0x75, 0x65, 0x43,
0x6c, 0x6f, 0x75, 0x64, 0x4c, 0x61, 0x62, 0x2f, 0x66, 0x72, 0x6f, 0x73, 0x74, 0x66, 0x73, 0x2d,
0x73, 0x64, 0x6b, 0x2d, 0x67, 0x6f, 0x2f, 0x61, 0x70, 0x69, 0x2f, 0x73, 0x74, 0x61, 0x74, 0x75,
0x73, 0x2f, 0x67, 0x72, 0x70, 0x63, 0x3b, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0xaa, 0x02, 0x1a,
0x4e, 0x65, 0x6f, 0x2e, 0x46, 0x69, 0x6c, 0x65, 0x53, 0x74, 0x6f, 0x72, 0x61, 0x67, 0x65, 0x2e,
0x41, 0x50, 0x49, 0x2e, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x62, 0x08, 0x65, 0x64, 0x69, 0x74,
0x69, 0x6f, 0x6e, 0x73, 0x70, 0xe8, 0x07,
0x4e, 0x54, 0x10, 0x04, 0x12, 0x16, 0x0a, 0x12, 0x52, 0x45, 0x53, 0x4f, 0x55, 0x52, 0x43, 0x45,
0x5f, 0x45, 0x58, 0x48, 0x41, 0x55, 0x53, 0x54, 0x45, 0x44, 0x10, 0x05, 0x2a, 0x88, 0x01, 0x0a,
0x06, 0x4f, 0x62, 0x6a, 0x65, 0x63, 0x74, 0x12, 0x11, 0x0a, 0x0d, 0x41, 0x43, 0x43, 0x45, 0x53,
0x53, 0x5f, 0x44, 0x45, 0x4e, 0x49, 0x45, 0x44, 0x10, 0x00, 0x12, 0x14, 0x0a, 0x10, 0x4f, 0x42,
0x4a, 0x45, 0x43, 0x54, 0x5f, 0x4e, 0x4f, 0x54, 0x5f, 0x46, 0x4f, 0x55, 0x4e, 0x44, 0x10, 0x01,
0x12, 0x0a, 0x0a, 0x06, 0x4c, 0x4f, 0x43, 0x4b, 0x45, 0x44, 0x10, 0x02, 0x12, 0x1b, 0x0a, 0x17,
0x4c, 0x4f, 0x43, 0x4b, 0x5f, 0x4e, 0x4f, 0x4e, 0x5f, 0x52, 0x45, 0x47, 0x55, 0x4c, 0x41, 0x52,
0x5f, 0x4f, 0x42, 0x4a, 0x45, 0x43, 0x54, 0x10, 0x03, 0x12, 0x1a, 0x0a, 0x16, 0x4f, 0x42, 0x4a,
0x45, 0x43, 0x54, 0x5f, 0x41, 0x4c, 0x52, 0x45, 0x41, 0x44, 0x59, 0x5f, 0x52, 0x45, 0x4d, 0x4f,
0x56, 0x45, 0x44, 0x10, 0x04, 0x12, 0x10, 0x0a, 0x0c, 0x4f, 0x55, 0x54, 0x5f, 0x4f, 0x46, 0x5f,
0x52, 0x41, 0x4e, 0x47, 0x45, 0x10, 0x05, 0x2a, 0x55, 0x0a, 0x09, 0x43, 0x6f, 0x6e, 0x74, 0x61,
0x69, 0x6e, 0x65, 0x72, 0x12, 0x17, 0x0a, 0x13, 0x43, 0x4f, 0x4e, 0x54, 0x41, 0x49, 0x4e, 0x45,
0x52, 0x5f, 0x4e, 0x4f, 0x54, 0x5f, 0x46, 0x4f, 0x55, 0x4e, 0x44, 0x10, 0x00, 0x12, 0x12, 0x0a,
0x0e, 0x45, 0x41, 0x43, 0x4c, 0x5f, 0x4e, 0x4f, 0x54, 0x5f, 0x46, 0x4f, 0x55, 0x4e, 0x44, 0x10,
0x01, 0x12, 0x1b, 0x0a, 0x17, 0x43, 0x4f, 0x4e, 0x54, 0x41, 0x49, 0x4e, 0x45, 0x52, 0x5f, 0x41,
0x43, 0x43, 0x45, 0x53, 0x53, 0x5f, 0x44, 0x45, 0x4e, 0x49, 0x45, 0x44, 0x10, 0x02, 0x2a, 0x31,
0x0a, 0x07, 0x53, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x12, 0x13, 0x0a, 0x0f, 0x54, 0x4f, 0x4b,
0x45, 0x4e, 0x5f, 0x4e, 0x4f, 0x54, 0x5f, 0x46, 0x4f, 0x55, 0x4e, 0x44, 0x10, 0x00, 0x12, 0x11,
0x0a, 0x0d, 0x54, 0x4f, 0x4b, 0x45, 0x4e, 0x5f, 0x45, 0x58, 0x50, 0x49, 0x52, 0x45, 0x44, 0x10,
0x01, 0x2a, 0x2b, 0x0a, 0x0a, 0x41, 0x50, 0x45, 0x4d, 0x61, 0x6e, 0x61, 0x67, 0x65, 0x72, 0x12,
0x1d, 0x0a, 0x19, 0x41, 0x50, 0x45, 0x5f, 0x4d, 0x41, 0x4e, 0x41, 0x47, 0x45, 0x52, 0x5f, 0x41,
0x43, 0x43, 0x45, 0x53, 0x53, 0x5f, 0x44, 0x45, 0x4e, 0x49, 0x45, 0x44, 0x10, 0x00, 0x42, 0x62,
0x5a, 0x43, 0x67, 0x69, 0x74, 0x2e, 0x66, 0x72, 0x6f, 0x73, 0x74, 0x66, 0x73, 0x2e, 0x69, 0x6e,
0x66, 0x6f, 0x2f, 0x54, 0x72, 0x75, 0x65, 0x43, 0x6c, 0x6f, 0x75, 0x64, 0x4c, 0x61, 0x62, 0x2f,
0x66, 0x72, 0x6f, 0x73, 0x74, 0x66, 0x73, 0x2d, 0x73, 0x64, 0x6b, 0x2d, 0x67, 0x6f, 0x2f, 0x61,
0x70, 0x69, 0x2f, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x2f, 0x67, 0x72, 0x70, 0x63, 0x3b, 0x73,
0x74, 0x61, 0x74, 0x75, 0x73, 0xaa, 0x02, 0x1a, 0x4e, 0x65, 0x6f, 0x2e, 0x46, 0x69, 0x6c, 0x65,
0x53, 0x74, 0x6f, 0x72, 0x61, 0x67, 0x65, 0x2e, 0x41, 0x50, 0x49, 0x2e, 0x53, 0x74, 0x61, 0x74,
0x75, 0x73, 0x62, 0x08, 0x65, 0x64, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x70, 0xe8, 0x07,
}
var file_api_status_grpc_types_proto_enumTypes = make([]protoimpl.EnumInfo, 7)

View file

@ -144,6 +144,9 @@ const (
// request parameter as the client sent it incorrectly, then this code should
// be used.
CommonFail_INVALID_ARGUMENT CommonFail = 4
// [**1029**] Resource exhausted failure. If the operation cannot be performed
// due to a lack of resources.
CommonFail_RESOURCE_EXHAUSTED CommonFail = 5
)
// Enum value maps for CommonFail.
@ -154,6 +157,7 @@ var (
2: "SIGNATURE_VERIFICATION_FAIL",
3: "NODE_UNDER_MAINTENANCE",
4: "INVALID_ARGUMENT",
5: "RESOURCE_EXHAUSTED",
}
CommonFail_value = map[string]int32{
"INTERNAL": 0,
@ -161,6 +165,7 @@ var (
"SIGNATURE_VERIFICATION_FAIL": 2,
"NODE_UNDER_MAINTENANCE": 3,
"INVALID_ARGUMENT": 4,
"RESOURCE_EXHAUSTED": 5,
}
)
@ -676,7 +681,7 @@ var file_api_status_grpc_types_proto_rawDesc = []byte{
0x45, 0x43, 0x54, 0x49, 0x4f, 0x4e, 0x5f, 0x53, 0x45, 0x53, 0x53, 0x49, 0x4f, 0x4e, 0x10, 0x04,
0x12, 0x17, 0x0a, 0x13, 0x53, 0x45, 0x43, 0x54, 0x49, 0x4f, 0x4e, 0x5f, 0x41, 0x50, 0x45, 0x5f,
0x4d, 0x41, 0x4e, 0x41, 0x47, 0x45, 0x52, 0x10, 0x05, 0x2a, 0x11, 0x0a, 0x07, 0x53, 0x75, 0x63,
0x63, 0x65, 0x73, 0x73, 0x12, 0x06, 0x0a, 0x02, 0x4f, 0x4b, 0x10, 0x00, 0x2a, 0x85, 0x01, 0x0a,
0x63, 0x65, 0x73, 0x73, 0x12, 0x06, 0x0a, 0x02, 0x4f, 0x4b, 0x10, 0x00, 0x2a, 0x9d, 0x01, 0x0a,
0x0a, 0x43, 0x6f, 0x6d, 0x6d, 0x6f, 0x6e, 0x46, 0x61, 0x69, 0x6c, 0x12, 0x0c, 0x0a, 0x08, 0x49,
0x4e, 0x54, 0x45, 0x52, 0x4e, 0x41, 0x4c, 0x10, 0x00, 0x12, 0x16, 0x0a, 0x12, 0x57, 0x52, 0x4f,
0x4e, 0x47, 0x5f, 0x4d, 0x41, 0x47, 0x49, 0x43, 0x5f, 0x4e, 0x55, 0x4d, 0x42, 0x45, 0x52, 0x10,
@ -685,34 +690,35 @@ var file_api_status_grpc_types_proto_rawDesc = []byte{
0x10, 0x02, 0x12, 0x1a, 0x0a, 0x16, 0x4e, 0x4f, 0x44, 0x45, 0x5f, 0x55, 0x4e, 0x44, 0x45, 0x52,
0x5f, 0x4d, 0x41, 0x49, 0x4e, 0x54, 0x45, 0x4e, 0x41, 0x4e, 0x43, 0x45, 0x10, 0x03, 0x12, 0x14,
0x0a, 0x10, 0x49, 0x4e, 0x56, 0x41, 0x4c, 0x49, 0x44, 0x5f, 0x41, 0x52, 0x47, 0x55, 0x4d, 0x45,
0x4e, 0x54, 0x10, 0x04, 0x2a, 0x88, 0x01, 0x0a, 0x06, 0x4f, 0x62, 0x6a, 0x65, 0x63, 0x74, 0x12,
0x11, 0x0a, 0x0d, 0x41, 0x43, 0x43, 0x45, 0x53, 0x53, 0x5f, 0x44, 0x45, 0x4e, 0x49, 0x45, 0x44,
0x10, 0x00, 0x12, 0x14, 0x0a, 0x10, 0x4f, 0x42, 0x4a, 0x45, 0x43, 0x54, 0x5f, 0x4e, 0x4f, 0x54,
0x5f, 0x46, 0x4f, 0x55, 0x4e, 0x44, 0x10, 0x01, 0x12, 0x0a, 0x0a, 0x06, 0x4c, 0x4f, 0x43, 0x4b,
0x45, 0x44, 0x10, 0x02, 0x12, 0x1b, 0x0a, 0x17, 0x4c, 0x4f, 0x43, 0x4b, 0x5f, 0x4e, 0x4f, 0x4e,
0x5f, 0x52, 0x45, 0x47, 0x55, 0x4c, 0x41, 0x52, 0x5f, 0x4f, 0x42, 0x4a, 0x45, 0x43, 0x54, 0x10,
0x03, 0x12, 0x1a, 0x0a, 0x16, 0x4f, 0x42, 0x4a, 0x45, 0x43, 0x54, 0x5f, 0x41, 0x4c, 0x52, 0x45,
0x41, 0x44, 0x59, 0x5f, 0x52, 0x45, 0x4d, 0x4f, 0x56, 0x45, 0x44, 0x10, 0x04, 0x12, 0x10, 0x0a,
0x0c, 0x4f, 0x55, 0x54, 0x5f, 0x4f, 0x46, 0x5f, 0x52, 0x41, 0x4e, 0x47, 0x45, 0x10, 0x05, 0x2a,
0x55, 0x0a, 0x09, 0x43, 0x6f, 0x6e, 0x74, 0x61, 0x69, 0x6e, 0x65, 0x72, 0x12, 0x17, 0x0a, 0x13,
0x43, 0x4f, 0x4e, 0x54, 0x41, 0x49, 0x4e, 0x45, 0x52, 0x5f, 0x4e, 0x4f, 0x54, 0x5f, 0x46, 0x4f,
0x55, 0x4e, 0x44, 0x10, 0x00, 0x12, 0x12, 0x0a, 0x0e, 0x45, 0x41, 0x43, 0x4c, 0x5f, 0x4e, 0x4f,
0x54, 0x5f, 0x46, 0x4f, 0x55, 0x4e, 0x44, 0x10, 0x01, 0x12, 0x1b, 0x0a, 0x17, 0x43, 0x4f, 0x4e,
0x54, 0x41, 0x49, 0x4e, 0x45, 0x52, 0x5f, 0x41, 0x43, 0x43, 0x45, 0x53, 0x53, 0x5f, 0x44, 0x45,
0x4e, 0x49, 0x45, 0x44, 0x10, 0x02, 0x2a, 0x31, 0x0a, 0x07, 0x53, 0x65, 0x73, 0x73, 0x69, 0x6f,
0x6e, 0x12, 0x13, 0x0a, 0x0f, 0x54, 0x4f, 0x4b, 0x45, 0x4e, 0x5f, 0x4e, 0x4f, 0x54, 0x5f, 0x46,
0x4f, 0x55, 0x4e, 0x44, 0x10, 0x00, 0x12, 0x11, 0x0a, 0x0d, 0x54, 0x4f, 0x4b, 0x45, 0x4e, 0x5f,
0x45, 0x58, 0x50, 0x49, 0x52, 0x45, 0x44, 0x10, 0x01, 0x2a, 0x2b, 0x0a, 0x0a, 0x41, 0x50, 0x45,
0x4d, 0x61, 0x6e, 0x61, 0x67, 0x65, 0x72, 0x12, 0x1d, 0x0a, 0x19, 0x41, 0x50, 0x45, 0x5f, 0x4d,
0x41, 0x4e, 0x41, 0x47, 0x45, 0x52, 0x5f, 0x41, 0x43, 0x43, 0x45, 0x53, 0x53, 0x5f, 0x44, 0x45,
0x4e, 0x49, 0x45, 0x44, 0x10, 0x00, 0x42, 0x62, 0x5a, 0x43, 0x67, 0x69, 0x74, 0x2e, 0x66, 0x72,
0x6f, 0x73, 0x74, 0x66, 0x73, 0x2e, 0x69, 0x6e, 0x66, 0x6f, 0x2f, 0x54, 0x72, 0x75, 0x65, 0x43,
0x6c, 0x6f, 0x75, 0x64, 0x4c, 0x61, 0x62, 0x2f, 0x66, 0x72, 0x6f, 0x73, 0x74, 0x66, 0x73, 0x2d,
0x73, 0x64, 0x6b, 0x2d, 0x67, 0x6f, 0x2f, 0x61, 0x70, 0x69, 0x2f, 0x73, 0x74, 0x61, 0x74, 0x75,
0x73, 0x2f, 0x67, 0x72, 0x70, 0x63, 0x3b, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0xaa, 0x02, 0x1a,
0x4e, 0x65, 0x6f, 0x2e, 0x46, 0x69, 0x6c, 0x65, 0x53, 0x74, 0x6f, 0x72, 0x61, 0x67, 0x65, 0x2e,
0x41, 0x50, 0x49, 0x2e, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x62, 0x08, 0x65, 0x64, 0x69, 0x74,
0x69, 0x6f, 0x6e, 0x73, 0x70, 0xe8, 0x07,
0x4e, 0x54, 0x10, 0x04, 0x12, 0x16, 0x0a, 0x12, 0x52, 0x45, 0x53, 0x4f, 0x55, 0x52, 0x43, 0x45,
0x5f, 0x45, 0x58, 0x48, 0x41, 0x55, 0x53, 0x54, 0x45, 0x44, 0x10, 0x05, 0x2a, 0x88, 0x01, 0x0a,
0x06, 0x4f, 0x62, 0x6a, 0x65, 0x63, 0x74, 0x12, 0x11, 0x0a, 0x0d, 0x41, 0x43, 0x43, 0x45, 0x53,
0x53, 0x5f, 0x44, 0x45, 0x4e, 0x49, 0x45, 0x44, 0x10, 0x00, 0x12, 0x14, 0x0a, 0x10, 0x4f, 0x42,
0x4a, 0x45, 0x43, 0x54, 0x5f, 0x4e, 0x4f, 0x54, 0x5f, 0x46, 0x4f, 0x55, 0x4e, 0x44, 0x10, 0x01,
0x12, 0x0a, 0x0a, 0x06, 0x4c, 0x4f, 0x43, 0x4b, 0x45, 0x44, 0x10, 0x02, 0x12, 0x1b, 0x0a, 0x17,
0x4c, 0x4f, 0x43, 0x4b, 0x5f, 0x4e, 0x4f, 0x4e, 0x5f, 0x52, 0x45, 0x47, 0x55, 0x4c, 0x41, 0x52,
0x5f, 0x4f, 0x42, 0x4a, 0x45, 0x43, 0x54, 0x10, 0x03, 0x12, 0x1a, 0x0a, 0x16, 0x4f, 0x42, 0x4a,
0x45, 0x43, 0x54, 0x5f, 0x41, 0x4c, 0x52, 0x45, 0x41, 0x44, 0x59, 0x5f, 0x52, 0x45, 0x4d, 0x4f,
0x56, 0x45, 0x44, 0x10, 0x04, 0x12, 0x10, 0x0a, 0x0c, 0x4f, 0x55, 0x54, 0x5f, 0x4f, 0x46, 0x5f,
0x52, 0x41, 0x4e, 0x47, 0x45, 0x10, 0x05, 0x2a, 0x55, 0x0a, 0x09, 0x43, 0x6f, 0x6e, 0x74, 0x61,
0x69, 0x6e, 0x65, 0x72, 0x12, 0x17, 0x0a, 0x13, 0x43, 0x4f, 0x4e, 0x54, 0x41, 0x49, 0x4e, 0x45,
0x52, 0x5f, 0x4e, 0x4f, 0x54, 0x5f, 0x46, 0x4f, 0x55, 0x4e, 0x44, 0x10, 0x00, 0x12, 0x12, 0x0a,
0x0e, 0x45, 0x41, 0x43, 0x4c, 0x5f, 0x4e, 0x4f, 0x54, 0x5f, 0x46, 0x4f, 0x55, 0x4e, 0x44, 0x10,
0x01, 0x12, 0x1b, 0x0a, 0x17, 0x43, 0x4f, 0x4e, 0x54, 0x41, 0x49, 0x4e, 0x45, 0x52, 0x5f, 0x41,
0x43, 0x43, 0x45, 0x53, 0x53, 0x5f, 0x44, 0x45, 0x4e, 0x49, 0x45, 0x44, 0x10, 0x02, 0x2a, 0x31,
0x0a, 0x07, 0x53, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x12, 0x13, 0x0a, 0x0f, 0x54, 0x4f, 0x4b,
0x45, 0x4e, 0x5f, 0x4e, 0x4f, 0x54, 0x5f, 0x46, 0x4f, 0x55, 0x4e, 0x44, 0x10, 0x00, 0x12, 0x11,
0x0a, 0x0d, 0x54, 0x4f, 0x4b, 0x45, 0x4e, 0x5f, 0x45, 0x58, 0x50, 0x49, 0x52, 0x45, 0x44, 0x10,
0x01, 0x2a, 0x2b, 0x0a, 0x0a, 0x41, 0x50, 0x45, 0x4d, 0x61, 0x6e, 0x61, 0x67, 0x65, 0x72, 0x12,
0x1d, 0x0a, 0x19, 0x41, 0x50, 0x45, 0x5f, 0x4d, 0x41, 0x4e, 0x41, 0x47, 0x45, 0x52, 0x5f, 0x41,
0x43, 0x43, 0x45, 0x53, 0x53, 0x5f, 0x44, 0x45, 0x4e, 0x49, 0x45, 0x44, 0x10, 0x00, 0x42, 0x62,
0x5a, 0x43, 0x67, 0x69, 0x74, 0x2e, 0x66, 0x72, 0x6f, 0x73, 0x74, 0x66, 0x73, 0x2e, 0x69, 0x6e,
0x66, 0x6f, 0x2f, 0x54, 0x72, 0x75, 0x65, 0x43, 0x6c, 0x6f, 0x75, 0x64, 0x4c, 0x61, 0x62, 0x2f,
0x66, 0x72, 0x6f, 0x73, 0x74, 0x66, 0x73, 0x2d, 0x73, 0x64, 0x6b, 0x2d, 0x67, 0x6f, 0x2f, 0x61,
0x70, 0x69, 0x2f, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x2f, 0x67, 0x72, 0x70, 0x63, 0x3b, 0x73,
0x74, 0x61, 0x74, 0x75, 0x73, 0xaa, 0x02, 0x1a, 0x4e, 0x65, 0x6f, 0x2e, 0x46, 0x69, 0x6c, 0x65,
0x53, 0x74, 0x6f, 0x72, 0x61, 0x67, 0x65, 0x2e, 0x41, 0x50, 0x49, 0x2e, 0x53, 0x74, 0x61, 0x74,
0x75, 0x73, 0x62, 0x08, 0x65, 0x64, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x70, 0xe8, 0x07,
}
var file_api_status_grpc_types_proto_enumTypes = make([]protoimpl.EnumInfo, 7)

View file

@ -65,6 +65,8 @@ const (
NodeUnderMaintenance
// InvalidArgument is a local Code value for INVALID_ARGUMENT status.
InvalidArgument
// ResourceExhausted is a local Code value for RESOURCE_EXHAUSTED status.
ResourceExhausted
)
const (

View file

@ -296,3 +296,62 @@ func (x *InvalidArgument) SetMessage(v string) {
func (x InvalidArgument) Message() string {
return x.v2.Message()
}
// ResourceExhausted is a failure status indicating that
// the operation cannot be performed due to a lack of resources.
// Instances provide Status and StatusV2 interfaces.
type ResourceExhausted struct {
v2 status.Status
}
const defaultResourceExhaustedMsg = "resource exhausted"
// Error implements the error interface.
func (x *ResourceExhausted) Error() string {
msg := x.v2.Message()
if msg == "" {
msg = defaultResourceExhaustedMsg
}
return errMessageStatusV2(
globalizeCodeV2(status.ResourceExhausted, status.GlobalizeCommonFail),
msg,
)
}
// implements local interface defined in FromStatusV2 func.
func (x *ResourceExhausted) fromStatusV2(st *status.Status) {
x.v2 = *st
}
// ToStatusV2 implements StatusV2 interface method.
// If the value was returned by FromStatusV2, returns the source message.
// Otherwise, returns message with
// - code: RESOURCE_EXHAUSTED;
// - string message: written message via SetMessage or
// "resource exhausted" as a default message;
// - details: empty.
func (x ResourceExhausted) ToStatusV2() *status.Status {
x.v2.SetCode(globalizeCodeV2(status.ResourceExhausted, status.GlobalizeCommonFail))
if x.v2.Message() == "" {
x.v2.SetMessage(defaultResourceExhaustedMsg)
}
return &x.v2
}
// SetMessage writes invalid argument failure message.
// Message should be used for debug purposes only.
//
// See also Message.
func (x *ResourceExhausted) SetMessage(v string) {
x.v2.SetMessage(v)
}
// Message returns status message. Zero status returns empty message.
// Message should be used for debug purposes only.
//
// See also SetMessage.
func (x ResourceExhausted) Message() string {
return x.v2.Message()
}

View file

@ -167,3 +167,42 @@ func TestInvalidArgument(t *testing.T) {
require.Equal(t, msg, stV2.Message())
})
}
func TestResourceExhausted(t *testing.T) {
t.Run("default", func(t *testing.T) {
var st apistatus.ResourceExhausted
require.Empty(t, st.Message())
})
t.Run("custom message", func(t *testing.T) {
var st apistatus.ResourceExhausted
msg := "some message"
st.SetMessage(msg)
stV2 := st.ToStatusV2()
require.Equal(t, msg, st.Message())
require.Equal(t, msg, stV2.Message())
})
t.Run("empty to V2", func(t *testing.T) {
var st apistatus.ResourceExhausted
stV2 := st.ToStatusV2()
require.Equal(t, "resource exhausted", stV2.Message())
})
t.Run("non-empty to V2", func(t *testing.T) {
var st apistatus.ResourceExhausted
msg := "some other msg"
st.SetMessage(msg)
stV2 := st.ToStatusV2()
require.Equal(t, msg, stV2.Message())
})
}

View file

@ -80,6 +80,8 @@ func FromStatusV2(st *status.Status) Status {
decoder = new(NodeUnderMaintenance)
case status.InvalidArgument:
decoder = new(InvalidArgument)
case status.ResourceExhausted:
decoder = new(ResourceExhausted)
}
case object.LocalizeFailStatus(&code):
switch code {

View file

@ -3,6 +3,7 @@ package cid
import (
"crypto/sha256"
"fmt"
"strings"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/refs"
"github.com/mr-tron/base58"
@ -113,3 +114,9 @@ func (id *ID) DecodeString(s string) error {
func (id ID) String() string {
return id.EncodeToString()
}
// Cmp returns an integer comparing two base58 encoded container ID lexicographically.
// The result will be 0 if id1 == id2, -1 if id1 < id2, and +1 if id1 > id2.
func (id ID) Cmp(id2 ID) int {
return strings.Compare(id.EncodeToString(), id2.EncodeToString())
}

View file

@ -3,6 +3,8 @@ package cid_test
import (
"crypto/rand"
"crypto/sha256"
"slices"
"strings"
"testing"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/refs"
@ -106,3 +108,17 @@ func TestID_Encode(t *testing.T) {
require.Equal(t, emptyID, id.EncodeToString())
})
}
func TestID_Cmp(t *testing.T) {
var arr []cid.ID
for i := 0; i < 3; i++ {
checksum := randSHA256Checksum()
arr = append(arr, cidtest.IDWithChecksum(checksum))
}
slices.SortFunc(arr, cid.ID.Cmp)
for i := 1; i < len(arr); i++ {
require.NotEqual(t, strings.Compare(arr[i-1].EncodeToString(), arr[i].EncodeToString()), 1, "array is not sorted correctly")
}
}

View file

@ -96,6 +96,21 @@ func (m NetMap) Epoch() uint64 {
return m.epoch
}
// Clone returns a copy of NetMap.
func (m *NetMap) Clone() *NetMap {
if m == nil {
return nil
}
dst := NetMap{
epoch: m.epoch,
nodes: make([]NodeInfo, len(m.nodes)),
}
for i, node := range m.nodes {
dst.nodes[i] = *node.Clone()
}
return &dst
}
// nodes is a slice of NodeInfo instances needed for HRW sorting.
type nodes []NodeInfo

View file

@ -1,6 +1,7 @@
package netmap_test
import (
"bytes"
"testing"
v2netmap "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/netmap"
@ -45,3 +46,28 @@ func TestNetMap_SetEpoch(t *testing.T) {
require.EqualValues(t, e, m.Epoch())
}
func TestNetMap_Clone(t *testing.T) {
nm := new(netmap.NetMap)
nm.SetEpoch(1)
var ni netmap.NodeInfo
ni.SetPublicKey([]byte{1, 2, 3})
nm.SetNodes([]netmap.NodeInfo{ni})
clone := nm.Clone()
require.True(t, clone != nm)
require.True(t, &(clone.Nodes()[0]) != &(nm.Nodes()[0]))
var clonev2 v2netmap.NetMap
clone.WriteToV2(&clonev2)
var bufClone []byte
bufClone = clonev2.StableMarshal(bufClone)
var nmv2 v2netmap.NetMap
nm.WriteToV2(&nmv2)
var bufNM []byte
bufNM = nmv2.StableMarshal(bufNM)
require.True(t, bytes.Equal(bufClone, bufNM))
}

View file

@ -563,6 +563,17 @@ func (x *NodeInfo) SetStatus(state NodeState) {
x.m.SetState(netmap.NodeState(state))
}
// Clone returns a copy of NodeInfo.
func (x *NodeInfo) Clone() *NodeInfo {
if x == nil {
return nil
}
return &NodeInfo{
hash: x.hash,
m: *x.m.Clone(),
}
}
// String implements fmt.Stringer.
//
// String is designed to be human-readable, and its format MAY differ between

View file

@ -108,3 +108,12 @@ func TestNodeInfo_ExternalAddr(t *testing.T) {
n.SetExternalAddresses(addr[1:]...)
require.Equal(t, addr[1:], n.ExternalAddresses())
}
func TestNodeInfo_Clone(t *testing.T) {
var ni NodeInfo
ni.SetPublicKey([]byte{2, 3})
c := ni.Clone()
require.True(t, c != &ni)
require.True(t, &(c.PublicKey()[0]) != &(ni.PublicKey()[0]))
}

View file

@ -4,6 +4,7 @@ import (
"crypto/ecdsa"
"crypto/sha256"
"fmt"
"strings"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/refs"
frostfscrypto "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/crypto"
@ -167,3 +168,9 @@ func (id *ID) UnmarshalJSON(data []byte) error {
return nil
}
// Cmp returns an integer comparing two base58 encoded object ID lexicographically.
// The result will be 0 if id1 == id2, -1 if id1 < id2, and +1 if id1 > id2.
func (id ID) Cmp(id2 ID) int {
return strings.Compare(id.EncodeToString(), id2.EncodeToString())
}

View file

@ -3,7 +3,9 @@ package oid
import (
"crypto/rand"
"crypto/sha256"
"slices"
"strconv"
"strings"
"testing"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/refs"
@ -180,3 +182,16 @@ func TestID_Encode(t *testing.T) {
require.Equal(t, emptyID, id.EncodeToString())
})
}
func TestID_Cmp(t *testing.T) {
id1 := randID(t)
id2 := randID(t)
id3 := randID(t)
arr := []ID{id1, id2, id3}
slices.SortFunc(arr, ID.Cmp)
for i := 1; i < len(arr); i++ {
require.NotEqual(t, strings.Compare(arr[i-1].EncodeToString(), arr[i].EncodeToString()), 1, "array is not sorted correctly")
}
}

View file

@ -6,6 +6,7 @@ import (
"crypto/sha256"
"fmt"
objectV2 "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/object"
buffPool "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/util/pool"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/checksum"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
@ -327,4 +328,11 @@ func (s *payloadSizeLimiter) prepareFirstChild() {
s.current.SetAttributes()
// attributes will be added to parent in detachParent
// add expiration epoch to each part
for _, attr := range s.parAttrs {
if attr.Key() == objectV2.SysAttributeExpEpoch {
s.current.SetAttributes(attr)
}
}
}

1303
pool/client.go Normal file

File diff suppressed because it is too large Load diff

330
pool/connection_manager.go Normal file
View file

@ -0,0 +1,330 @@
package pool
import (
"context"
"errors"
"fmt"
"math/rand"
"sort"
"sync"
"sync/atomic"
"time"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
)
type innerPool struct {
lock sync.RWMutex
sampler *sampler
clients []client
}
type connectionManager struct {
innerPools []*innerPool
rebalanceParams rebalanceParameters
clientBuilder clientBuilder
logger *zap.Logger
healthChecker *healthCheck
}
// newConnectionManager returns an instance of connectionManager configured according to the parameters.
//
// Before using connectionManager, you MUST call Dial.
func newConnectionManager(options InitParameters) (*connectionManager, error) {
if options.key == nil {
return nil, fmt.Errorf("missed required parameter 'Key'")
}
nodesParams, err := adjustNodeParams(options.nodeParams)
if err != nil {
return nil, err
}
manager := &connectionManager{
logger: options.logger,
rebalanceParams: rebalanceParameters{
nodesParams: nodesParams,
nodeRequestTimeout: options.healthcheckTimeout,
clientRebalanceInterval: options.clientRebalanceInterval,
sessionExpirationDuration: options.sessionExpirationDuration,
},
clientBuilder: options.clientBuilder,
}
return manager, nil
}
func (cm *connectionManager) dial(ctx context.Context) error {
inner := make([]*innerPool, len(cm.rebalanceParams.nodesParams))
var atLeastOneHealthy bool
for i, params := range cm.rebalanceParams.nodesParams {
clients := make([]client, len(params.weights))
for j, addr := range params.addresses {
clients[j] = cm.clientBuilder(addr)
if err := clients[j].dial(ctx); err != nil {
cm.log(zap.WarnLevel, "failed to build client", zap.String("address", addr), zap.Error(err))
continue
}
atLeastOneHealthy = true
}
source := rand.NewSource(time.Now().UnixNano())
sampl := newSampler(params.weights, source)
inner[i] = &innerPool{
sampler: sampl,
clients: clients,
}
}
if !atLeastOneHealthy {
return fmt.Errorf("at least one node must be healthy")
}
cm.innerPools = inner
cm.healthChecker = newHealthCheck(cm.rebalanceParams.clientRebalanceInterval)
cm.healthChecker.startRebalance(ctx, cm.rebalance)
return nil
}
func (cm *connectionManager) rebalance(ctx context.Context) {
buffers := make([][]float64, len(cm.rebalanceParams.nodesParams))
for i, params := range cm.rebalanceParams.nodesParams {
buffers[i] = make([]float64, len(params.weights))
}
cm.updateNodesHealth(ctx, buffers)
}
func (cm *connectionManager) log(level zapcore.Level, msg string, fields ...zap.Field) {
if cm.logger == nil {
return
}
cm.logger.Log(level, msg, fields...)
}
func adjustNodeParams(nodeParams []NodeParam) ([]*nodesParam, error) {
if len(nodeParams) == 0 {
return nil, errors.New("no FrostFS peers configured")
}
nodesParamsMap := make(map[int]*nodesParam)
for _, param := range nodeParams {
nodes, ok := nodesParamsMap[param.priority]
if !ok {
nodes = &nodesParam{priority: param.priority}
}
nodes.addresses = append(nodes.addresses, param.address)
nodes.weights = append(nodes.weights, param.weight)
nodesParamsMap[param.priority] = nodes
}
nodesParams := make([]*nodesParam, 0, len(nodesParamsMap))
for _, nodes := range nodesParamsMap {
nodes.weights = adjustWeights(nodes.weights)
nodesParams = append(nodesParams, nodes)
}
sort.Slice(nodesParams, func(i, j int) bool {
return nodesParams[i].priority < nodesParams[j].priority
})
return nodesParams, nil
}
func (cm *connectionManager) updateNodesHealth(ctx context.Context, buffers [][]float64) {
wg := sync.WaitGroup{}
for i, inner := range cm.innerPools {
wg.Add(1)
bufferWeights := buffers[i]
go func(i int, _ *innerPool) {
defer wg.Done()
cm.updateInnerNodesHealth(ctx, i, bufferWeights)
}(i, inner)
}
wg.Wait()
}
func (cm *connectionManager) updateInnerNodesHealth(ctx context.Context, i int, bufferWeights []float64) {
if i > len(cm.innerPools)-1 {
return
}
pool := cm.innerPools[i]
options := cm.rebalanceParams
healthyChanged := new(atomic.Bool)
wg := sync.WaitGroup{}
for j, cli := range pool.clients {
wg.Add(1)
go func(j int, cli client) {
defer wg.Done()
tctx, c := context.WithTimeout(ctx, options.nodeRequestTimeout)
defer c()
changed, err := restartIfUnhealthy(tctx, cli)
healthy := err == nil
if healthy {
bufferWeights[j] = options.nodesParams[i].weights[j]
} else {
bufferWeights[j] = 0
}
if changed {
fields := []zap.Field{zap.String("address", cli.address()), zap.Bool("healthy", healthy)}
if err != nil {
fields = append(fields, zap.String("reason", err.Error()))
}
cm.log(zap.DebugLevel, "health has changed", fields...)
healthyChanged.Store(true)
}
}(j, cli)
}
wg.Wait()
if healthyChanged.Load() {
probabilities := adjustWeights(bufferWeights)
source := rand.NewSource(time.Now().UnixNano())
pool.lock.Lock()
pool.sampler = newSampler(probabilities, source)
pool.lock.Unlock()
}
}
// restartIfUnhealthy checks healthy status of client and recreate it if status is unhealthy.
// Indicating if status was changed by this function call and returns error that caused unhealthy status.
func restartIfUnhealthy(ctx context.Context, c client) (changed bool, err error) {
defer func() {
if err != nil {
c.setUnhealthy()
} else {
c.setHealthy()
}
}()
wasHealthy := c.isHealthy()
if res, err := c.healthcheck(ctx); err == nil {
if res.Status().IsMaintenance() {
return wasHealthy, new(apistatus.NodeUnderMaintenance)
}
return !wasHealthy, nil
}
if err = c.restart(ctx); err != nil {
return wasHealthy, err
}
res, err := c.healthcheck(ctx)
if err != nil {
return wasHealthy, err
}
if res.Status().IsMaintenance() {
return wasHealthy, new(apistatus.NodeUnderMaintenance)
}
return !wasHealthy, nil
}
func adjustWeights(weights []float64) []float64 {
adjusted := make([]float64, len(weights))
sum := 0.0
for _, weight := range weights {
sum += weight
}
if sum > 0 {
for i, weight := range weights {
adjusted[i] = weight / sum
}
}
return adjusted
}
func (cm *connectionManager) connection() (client, error) {
for _, inner := range cm.innerPools {
cp, err := inner.connection()
if err == nil {
return cp, nil
}
}
return nil, errors.New("no healthy client")
}
// iterate iterates over all clients in all innerPools.
func (cm *connectionManager) iterate(cb func(client)) {
for _, inner := range cm.innerPools {
for _, cl := range inner.clients {
if cl.isHealthy() {
cb(cl)
}
}
}
}
func (p *innerPool) connection() (client, error) {
p.lock.RLock() // need lock because of using p.sampler
defer p.lock.RUnlock()
if len(p.clients) == 1 {
cp := p.clients[0]
if cp.isHealthy() {
return cp, nil
}
return nil, errors.New("no healthy client")
}
attempts := 3 * len(p.clients)
for range attempts {
i := p.sampler.Next()
if cp := p.clients[i]; cp.isHealthy() {
return cp, nil
}
}
return nil, errors.New("no healthy client")
}
func (cm connectionManager) Statistic() Statistic {
stat := Statistic{}
for _, inner := range cm.innerPools {
nodes := make([]string, 0, len(inner.clients))
for _, cl := range inner.clients {
if cl.isHealthy() {
nodes = append(nodes, cl.address())
}
node := NodeStatistic{
address: cl.address(),
methods: cl.methodsStatus(),
overallErrors: cl.overallErrorRate(),
currentErrors: cl.currentErrorRate(),
}
stat.nodes = append(stat.nodes, node)
stat.overallErrors += node.overallErrors
}
if len(stat.currentNodes) == 0 {
stat.currentNodes = nodes
}
}
return stat
}
func (cm *connectionManager) close() {
cm.healthChecker.stopRebalance()
// close all clients
for _, pools := range cm.innerPools {
for _, cli := range pools.clients {
_ = cli.close()
}
}
}

47
pool/healthcheck.go Normal file
View file

@ -0,0 +1,47 @@
package pool
import (
"context"
"time"
)
type healthCheck struct {
cancel context.CancelFunc
closedCh chan struct{}
clientRebalanceInterval time.Duration
}
func newHealthCheck(clientRebalanceInterval time.Duration) *healthCheck {
var h healthCheck
h.clientRebalanceInterval = clientRebalanceInterval
h.closedCh = make(chan struct{})
return &h
}
// startRebalance runs loop to monitor connection healthy status.
func (h *healthCheck) startRebalance(ctx context.Context, callback func(ctx context.Context)) {
ctx, cancel := context.WithCancel(ctx)
h.cancel = cancel
go func() {
ticker := time.NewTicker(h.clientRebalanceInterval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
close(h.closedCh)
return
case <-ticker.C:
callback(ctx)
ticker.Reset(h.clientRebalanceInterval)
}
}
}()
}
func (h *healthCheck) stopRebalance() {
h.cancel()
<-h.closedCh
}

File diff suppressed because it is too large Load diff

View file

@ -104,7 +104,7 @@ func TestBuildPoolOneNodeFailed(t *testing.T) {
expectedAuthKey := frostfsecdsa.PublicKey(clientKeys[1].PublicKey)
condition := func() bool {
cp, err := clientPool.connection()
cp, err := clientPool.manager.connection()
if err != nil {
return false
}
@ -141,7 +141,7 @@ func TestOneNode(t *testing.T) {
require.NoError(t, err)
t.Cleanup(pool.Close)
cp, err := pool.connection()
cp, err := pool.manager.connection()
require.NoError(t, err)
st, _ := pool.cache.Get(formCacheKey(cp.address(), pool.key, false))
expectedAuthKey := frostfsecdsa.PublicKey(key1.PublicKey)
@ -171,7 +171,7 @@ func TestTwoNodes(t *testing.T) {
require.NoError(t, err)
t.Cleanup(pool.Close)
cp, err := pool.connection()
cp, err := pool.manager.connection()
require.NoError(t, err)
st, _ := pool.cache.Get(formCacheKey(cp.address(), pool.key, false))
require.True(t, assertAuthKeyForAny(st, clientKeys))
@ -220,13 +220,12 @@ func TestOneOfTwoFailed(t *testing.T) {
err = pool.Dial(context.Background())
require.NoError(t, err)
require.NoError(t, err)
t.Cleanup(pool.Close)
time.Sleep(2 * time.Second)
for range 5 {
cp, err := pool.connection()
cp, err := pool.manager.connection()
require.NoError(t, err)
st, _ := pool.cache.Get(formCacheKey(cp.address(), pool.key, false))
require.True(t, assertAuthKeyForAny(st, clientKeys))
@ -369,11 +368,11 @@ func TestUpdateNodesHealth(t *testing.T) {
tc.prepareCli(cli)
p, log := newPool(t, cli)
p.updateNodesHealth(ctx, [][]float64{{1}})
p.manager.updateNodesHealth(ctx, [][]float64{{1}})
changed := tc.wasHealthy != tc.willHealthy
require.Equalf(t, tc.willHealthy, cli.isHealthy(), "healthy status should be: %v", tc.willHealthy)
require.Equalf(t, changed, 1 == log.Len(), "healthy status should be changed: %v", changed)
require.Equalf(t, changed, log.Len() == 1, "healthy status should be changed: %v", changed)
})
}
}
@ -385,19 +384,19 @@ func newPool(t *testing.T, cli *mockClient) (*Pool, *observer.ObservedLogs) {
require.NoError(t, err)
return &Pool{
innerPools: []*innerPool{{
sampler: newSampler([]float64{1}, rand.NewSource(0)),
clients: []client{cli},
}},
cache: cache,
key: newPrivateKey(t),
closedCh: make(chan struct{}),
rebalanceParams: rebalanceParameters{
nodesParams: []*nodesParam{{1, []string{"peer0"}, []float64{1}}},
nodeRequestTimeout: time.Second,
clientRebalanceInterval: 200 * time.Millisecond,
},
logger: log,
cache: cache,
key: newPrivateKey(t),
manager: &connectionManager{
innerPools: []*innerPool{{
sampler: newSampler([]float64{1}, rand.NewSource(0)),
clients: []client{cli},
}},
healthChecker: newHealthCheck(200 * time.Millisecond),
rebalanceParams: rebalanceParameters{
nodesParams: []*nodesParam{{1, []string{"peer0"}, []float64{1}}},
nodeRequestTimeout: time.Second,
},
logger: log},
}, observedLog
}
@ -435,7 +434,7 @@ func TestTwoFailed(t *testing.T) {
time.Sleep(2 * time.Second)
_, err = pool.connection()
_, err = pool.manager.connection()
require.Error(t, err)
require.Contains(t, err.Error(), "no healthy")
}
@ -469,7 +468,7 @@ func TestSessionCache(t *testing.T) {
t.Cleanup(pool.Close)
// cache must contain session token
cp, err := pool.connection()
cp, err := pool.manager.connection()
require.NoError(t, err)
st, _ := pool.cache.Get(formCacheKey(cp.address(), pool.key, false))
require.True(t, st.AssertAuthKey(&expectedAuthKey))
@ -482,7 +481,7 @@ func TestSessionCache(t *testing.T) {
require.Error(t, err)
// cache must not contain session token
cp, err = pool.connection()
cp, err = pool.manager.connection()
require.NoError(t, err)
_, ok := pool.cache.Get(formCacheKey(cp.address(), pool.key, false))
require.False(t, ok)
@ -494,7 +493,7 @@ func TestSessionCache(t *testing.T) {
require.NoError(t, err)
// cache must contain session token
cp, err = pool.connection()
cp, err = pool.manager.connection()
require.NoError(t, err)
st, _ = pool.cache.Get(formCacheKey(cp.address(), pool.key, false))
require.True(t, st.AssertAuthKey(&expectedAuthKey))
@ -538,7 +537,7 @@ func TestPriority(t *testing.T) {
expectedAuthKey1 := frostfsecdsa.PublicKey(clientKeys[0].PublicKey)
firstNode := func() bool {
cp, err := pool.connection()
cp, err := pool.manager.connection()
require.NoError(t, err)
st, _ := pool.cache.Get(formCacheKey(cp.address(), pool.key, false))
return st.AssertAuthKey(&expectedAuthKey1)
@ -546,7 +545,7 @@ func TestPriority(t *testing.T) {
expectedAuthKey2 := frostfsecdsa.PublicKey(clientKeys[1].PublicKey)
secondNode := func() bool {
cp, err := pool.connection()
cp, err := pool.manager.connection()
require.NoError(t, err)
st, _ := pool.cache.Get(formCacheKey(cp.address(), pool.key, false))
return st.AssertAuthKey(&expectedAuthKey2)
@ -583,7 +582,7 @@ func TestSessionCacheWithKey(t *testing.T) {
require.NoError(t, err)
// cache must contain session token
cp, err := pool.connection()
cp, err := pool.manager.connection()
require.NoError(t, err)
st, _ := pool.cache.Get(formCacheKey(cp.address(), pool.key, false))
require.True(t, st.AssertAuthKey(&expectedAuthKey))
@ -636,9 +635,8 @@ func TestSessionTokenOwner(t *testing.T) {
cc.sessionTarget = func(tok session.Object) {
tkn = tok
}
err = p.initCallContext(&cc, prm, prmCtx)
err = p.initCall(&cc, prm, prmCtx)
require.NoError(t, err)
err = p.openDefaultSession(ctx, &cc)
require.NoError(t, err)
require.True(t, tkn.VerifySignature())
@ -922,14 +920,14 @@ func TestSwitchAfterErrorThreshold(t *testing.T) {
t.Cleanup(pool.Close)
for range errorThreshold {
conn, err := pool.connection()
conn, err := pool.manager.connection()
require.NoError(t, err)
require.Equal(t, nodes[0].address, conn.address())
_, err = conn.objectGet(ctx, PrmObjectGet{})
require.Error(t, err)
}
conn, err := pool.connection()
conn, err := pool.manager.connection()
require.NoError(t, err)
require.Equal(t, nodes[1].address, conn.address())
_, err = conn.objectGet(ctx, PrmObjectGet{})

View file

@ -47,9 +47,6 @@ func TestHealthyReweight(t *testing.T) {
buffer = make([]float64, len(weights))
)
cache, err := newCache(0)
require.NoError(t, err)
client1 := newMockClient(names[0], *newPrivateKey(t))
client1.errOnDial()
@ -59,22 +56,20 @@ func TestHealthyReweight(t *testing.T) {
sampler: newSampler(weights, rand.NewSource(0)),
clients: []client{client1, client2},
}
p := &Pool{
cm := &connectionManager{
innerPools: []*innerPool{inner},
cache: cache,
key: newPrivateKey(t),
rebalanceParams: rebalanceParameters{nodesParams: []*nodesParam{{weights: weights}}},
}
// check getting first node connection before rebalance happened
connection0, err := p.connection()
connection0, err := cm.connection()
require.NoError(t, err)
mock0 := connection0.(*mockClient)
require.Equal(t, names[0], mock0.address())
p.updateInnerNodesHealth(context.TODO(), 0, buffer)
cm.updateInnerNodesHealth(context.TODO(), 0, buffer)
connection1, err := p.connection()
connection1, err := cm.connection()
require.NoError(t, err)
mock1 := connection1.(*mockClient)
require.Equal(t, names[1], mock1.address())
@ -84,10 +79,10 @@ func TestHealthyReweight(t *testing.T) {
inner.clients[0] = newMockClient(names[0], *newPrivateKey(t))
inner.lock.Unlock()
p.updateInnerNodesHealth(context.TODO(), 0, buffer)
cm.updateInnerNodesHealth(context.TODO(), 0, buffer)
inner.sampler = newSampler(weights, rand.NewSource(0))
connection0, err = p.connection()
connection0, err = cm.connection()
require.NoError(t, err)
mock0 = connection0.(*mockClient)
require.Equal(t, names[0], mock0.address())
@ -108,12 +103,12 @@ func TestHealthyNoReweight(t *testing.T) {
newMockClient(names[1], *newPrivateKey(t)),
},
}
p := &Pool{
cm := &connectionManager{
innerPools: []*innerPool{inner},
rebalanceParams: rebalanceParameters{nodesParams: []*nodesParam{{weights: weights}}},
}
p.updateInnerNodesHealth(context.TODO(), 0, buffer)
cm.updateInnerNodesHealth(context.TODO(), 0, buffer)
inner.lock.RLock()
defer inner.lock.RUnlock()

View file

@ -97,6 +97,11 @@ func (n NodeStatistic) AverageListContainer() time.Duration {
return n.averageTime(methodContainerList)
}
// AverageListContainerStream returns average time to perform ContainerListStream request.
func (n NodeStatistic) AverageListContainerStream() time.Duration {
return n.averageTime(methodContainerListStream)
}
// AverageDeleteContainer returns average time to perform ContainerDelete request.
func (n NodeStatistic) AverageDeleteContainer() time.Duration {
return n.averageTime(methodContainerDelete)

View file

@ -0,0 +1,82 @@
package tree
import (
"errors"
"sync"
"time"
)
type (
circuitBreaker struct {
breakDuration time.Duration
threshold int
mu sync.RWMutex
state map[uint64]state
}
state struct {
counter int
breakTimestamp time.Time
}
)
var ErrCBClosed = errors.New("circuit breaker is closed")
func newCircuitBreaker(breakDuration time.Duration, threshold int) *circuitBreaker {
return &circuitBreaker{
breakDuration: breakDuration,
threshold: threshold,
state: make(map[uint64]state),
}
}
func (cb *circuitBreaker) checkBreak(id uint64) error {
cb.mu.RLock()
s, ok := cb.state[id]
cb.mu.RUnlock()
if ok && time.Since(s.breakTimestamp) < cb.breakDuration {
return ErrCBClosed
}
return nil
}
func (cb *circuitBreaker) openBreak(id uint64) {
cb.mu.Lock()
defer cb.mu.Unlock()
delete(cb.state, id)
}
func (cb *circuitBreaker) incError(id uint64) {
cb.mu.Lock()
defer cb.mu.Unlock()
s := cb.state[id]
s.counter++
if s.counter >= cb.threshold {
s.counter = cb.threshold
if time.Since(s.breakTimestamp) >= cb.breakDuration {
s.breakTimestamp = time.Now()
}
}
cb.state[id] = s
}
func (cb *circuitBreaker) Do(id uint64, f func() error) error {
if err := cb.checkBreak(id); err != nil {
return err
}
err := f()
if err == nil {
cb.openBreak(id)
} else {
cb.incError(id)
}
return err
}

View file

@ -0,0 +1,68 @@
package tree
import (
"errors"
"testing"
"time"
"github.com/stretchr/testify/require"
)
func TestCircuitBreaker(t *testing.T) {
remoteErr := errors.New("service is being synchronized")
breakDuration := 1 * time.Second
threshold := 10
cb := newCircuitBreaker(breakDuration, threshold)
// Hit threshold
for i := 0; i < threshold; i++ {
err := cb.Do(1, func() error { return remoteErr })
require.ErrorIs(t, err, remoteErr)
}
// Different client should not be affected by threshold
require.NoError(t, cb.Do(2, func() error { return nil }))
// Immediate request should return circuit breaker error
require.ErrorIs(t, cb.Do(1, func() error { return nil }), ErrCBClosed)
// Request after breakDuration should be ok
time.Sleep(breakDuration)
require.NoError(t, cb.Do(1, func() error { return nil }))
// Try hitting threshold one more time after break duration
for i := 0; i < threshold; i++ {
err := cb.Do(1, func() error { return remoteErr })
require.ErrorIs(t, err, remoteErr)
}
// Immediate request should return circuit breaker error
require.ErrorIs(t, cb.Do(1, func() error { return nil }), ErrCBClosed)
}
func TestCircuitBreakerNoBlock(t *testing.T) {
remoteErr := errors.New("service is being synchronized")
funcDuration := 200 * time.Millisecond
threshold := 100
cb := newCircuitBreaker(10*funcDuration, threshold)
slowFunc := func() error {
time.Sleep(funcDuration)
return remoteErr
}
for i := 0; i < threshold; i++ {
// run in multiple goroutines Do function
go func() {
cb.Do(1, slowFunc)
}()
}
time.Sleep(funcDuration)
// eventually at most after one more func duration circuit breaker will be
// closed and not blocked by slow func execution under mutex
require.Eventually(t, func() bool {
return errors.Is(cb.Do(1, func() error { return nil }), ErrCBClosed)
}, funcDuration, funcDuration/10)
}

View file

@ -24,10 +24,12 @@ import (
)
const (
defaultRebalanceInterval = 15 * time.Second
defaultHealthcheckTimeout = 4 * time.Second
defaultDialTimeout = 5 * time.Second
defaultStreamTimeout = 10 * time.Second
defaultRebalanceInterval = 15 * time.Second
defaultHealthcheckTimeout = 4 * time.Second
defaultDialTimeout = 5 * time.Second
defaultStreamTimeout = 10 * time.Second
defaultCircuitBreakerDuration = 10 * time.Second
defaultCircuitBreakerTreshold = 10
)
// SubTreeSort defines an order of nodes returned from GetSubTree RPC.
@ -76,6 +78,8 @@ type InitParameters struct {
dialOptions []grpc.DialOption
maxRequestAttempts int
netMapInfoSource NetMapInfoSource
circuitBreakerThreshold int
circuitBreakerDuration time.Duration
}
type NetMapInfoSource interface {
@ -117,6 +121,8 @@ type Pool struct {
// * retry in case of request failure (see Pool.requestWithRetry)
// startIndices will be used if netMapInfoSource is not set
startIndices [2]int
// circuit breaker for dial operations when netmap is being used
cb *circuitBreaker
}
type innerPool struct {
@ -248,6 +254,10 @@ func NewPool(options InitParameters) (*Pool, error) {
methods: methods,
netMapInfoSource: options.netMapInfoSource,
clientMap: make(map[uint64]client),
cb: newCircuitBreaker(
options.circuitBreakerDuration,
options.circuitBreakerThreshold,
),
}
if options.netMapInfoSource == nil {
@ -366,6 +376,18 @@ func (x *InitParameters) SetNetMapInfoSource(netMapInfoSource NetMapInfoSource)
x.netMapInfoSource = netMapInfoSource
}
// SetCircuitBreakerThreshold sets number of consecutive failed connection before
// circuit is considered closed and therefore return error immediately.
func (x *InitParameters) SetCircuitBreakerThreshold(circuitBreakerThreshold int) {
x.circuitBreakerThreshold = circuitBreakerThreshold
}
// SetCircuitBreakerDuration sets duration for circuit to be considered closed.
// This effectively limits to one new connection try per duration.
func (x *InitParameters) SetCircuitBreakerDuration(circuitBreakerDuration time.Duration) {
x.circuitBreakerDuration = circuitBreakerDuration
}
// GetNodes invokes eponymous method from TreeServiceClient.
//
// Can return predefined errors:
@ -414,12 +436,19 @@ func (p *Pool) GetNodes(ctx context.Context, prm GetNodesParams) ([]*tree.GetNod
//
// Must be initialized using Pool.GetSubTree, any other usage is unsafe.
type SubTreeReader struct {
cli *rpcapi.GetSubTreeResponseReader
cli *rpcapi.GetSubTreeResponseReader
probe *tree.GetSubTreeResponseBody
}
// Read reads another list of the subtree nodes.
func (x *SubTreeReader) Read(buf []*tree.GetSubTreeResponseBody) (int, error) {
for i := range buf {
i := 0
if x.probe != nil && len(buf) != 0 {
buf[0] = x.probe
x.probe = nil
i = 1
}
for ; i < len(buf); i++ {
var resp tree.GetSubTreeResponse
err := x.cli.Read(&resp)
if err == io.EOF {
@ -436,6 +465,10 @@ func (x *SubTreeReader) Read(buf []*tree.GetSubTreeResponseBody) (int, error) {
// ReadAll reads all nodes subtree nodes.
func (x *SubTreeReader) ReadAll() ([]*tree.GetSubTreeResponseBody, error) {
var res []*tree.GetSubTreeResponseBody
if x.probe != nil {
res = append(res, x.probe)
x.probe = nil
}
for {
var resp tree.GetSubTreeResponse
err := x.cli.Read(&resp)
@ -452,6 +485,12 @@ func (x *SubTreeReader) ReadAll() ([]*tree.GetSubTreeResponseBody, error) {
// Next gets the next node from subtree.
func (x *SubTreeReader) Next() (*tree.GetSubTreeResponseBody, error) {
if x.probe != nil {
res := x.probe
x.probe = nil
return res, nil
}
var resp tree.GetSubTreeResponse
err := x.cli.Read(&resp)
if err == io.EOF {
@ -495,16 +534,24 @@ func (p *Pool) GetSubTree(ctx context.Context, prm GetSubTreeParams) (*SubTreeRe
}
var cli *rpcapi.GetSubTreeResponseReader
var probeBody *tree.GetSubTreeResponseBody
err := p.requestWithRetry(ctx, prm.CID, func(client *rpcclient.Client) (inErr error) {
cli, inErr = rpcapi.GetSubTree(client, request, rpcclient.WithContext(ctx))
return handleError("failed to get sub tree client", inErr)
if inErr != nil {
return handleError("failed to get sub tree client", inErr)
}
probe := &tree.GetSubTreeResponse{}
inErr = cli.Read(probe)
probeBody = probe.GetBody()
return handleError("failed to get first resp from sub tree client", inErr)
})
p.methods[methodGetSubTree].IncRequests(time.Since(start))
if err != nil {
return nil, err
}
return &SubTreeReader{cli: cli}, nil
return &SubTreeReader{cli: cli, probe: probeBody}, nil
}
// AddNode invokes eponymous method from TreeServiceClient.
@ -764,6 +811,14 @@ func fillDefaultInitParams(params *InitParameters) {
if params.maxRequestAttempts <= 0 {
params.maxRequestAttempts = len(params.nodeParams)
}
if params.circuitBreakerDuration <= 0 {
params.circuitBreakerDuration = defaultCircuitBreakerDuration
}
if params.circuitBreakerThreshold <= 0 {
params.circuitBreakerThreshold = defaultCircuitBreakerTreshold
}
}
func (p *Pool) log(level zapcore.Level, msg string, fields ...zap.Field) {
@ -959,14 +1014,17 @@ LOOP:
treeCl, ok := p.getClientFromMap(cnrNode.Hash())
if !ok {
treeCl, err = p.getNewTreeClient(ctx, cnrNode)
err = p.cb.Do(cnrNode.Hash(), func() error {
treeCl, err = p.getNewTreeClient(ctx, cnrNode)
return err
})
if err != nil {
finErr = finalError(finErr, err)
p.log(zap.DebugLevel, "failed to create tree client", zap.String("request_id", reqID), zap.Int("remaining attempts", attempts))
continue
}
p.addClientToMap(cnrNode.Hash(), treeCl)
treeCl = p.addClientToMap(cnrNode.Hash(), treeCl)
}
attempts--
@ -1001,16 +1059,24 @@ func (p *Pool) getClientFromMap(hash uint64) (client, bool) {
return cl, ok
}
func (p *Pool) addClientToMap(hash uint64, cl client) {
func (p *Pool) addClientToMap(hash uint64, cl client) client {
p.mutex.Lock()
defer p.mutex.Unlock()
if old, ok := p.clientMap[hash]; ok {
_ = cl.close()
return old
}
p.clientMap[hash] = cl
p.mutex.Unlock()
return cl
}
func (p *Pool) deleteClientFromMap(hash uint64) {
p.mutex.Lock()
_ = p.clientMap[hash].close()
delete(p.clientMap, hash)
if cli, ok := p.clientMap[hash]; ok {
_ = cli.close()
delete(p.clientMap, hash)
}
p.mutex.Unlock()
}
@ -1030,6 +1096,16 @@ func (p *Pool) getNewTreeClient(ctx context.Context, node netmap.NodeInfo) (*tre
newTreeCl := newTreeClient(addr.URIAddr(), p.dialOptions, p.nodeDialTimeout, p.streamTimeout)
if err = newTreeCl.dial(ctx); err != nil {
p.log(zap.WarnLevel, "failed to dial tree client", zap.String("address", addr.URIAddr()), zap.Error(err))
// We have to close connection here after failed `dial()`.
// This is NOT necessary in object pool and regular tree pool without netmap support, because:
// - object pool uses SDK object client which closes connection during `dial()` call by itself,
// - regular tree pool is going to reuse connection by calling `redialIfNecessary()`.
// Tree pool with netmap support does not operate with background goroutine, so we have to close connection immediately.
if err = newTreeCl.close(); err != nil {
p.log(zap.WarnLevel, "failed to close recently dialed tree client", zap.String("address", addr.URIAddr()), zap.Error(err))
}
return false
}

View file

@ -0,0 +1,345 @@
package tree
import (
"bytes"
"context"
"errors"
"io"
"net"
"runtime"
"strconv"
"testing"
apinetmap "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/netmap"
apitree "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/tree"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool"
tree "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool/tree/service"
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
"github.com/stretchr/testify/require"
"google.golang.org/grpc"
)
type mockTreeServer struct {
id int
srv *grpc.Server
lis net.Listener
key *keys.PrivateKey
healthy bool
addCounter int
getSubTreeError error
getSubTreeResponses []*tree.GetSubTreeResponse_Body
getSubTreeCounter int
}
type mockNetmapSource struct {
servers []*mockTreeServer
policy string
}
func (m *mockNetmapSource) NetMapSnapshot(context.Context) (netmap.NetMap, error) {
nm := netmap.NetMap{}
nodes := make([]netmap.NodeInfo, len(m.servers))
for i, server := range m.servers {
ni := apinetmap.NodeInfo{}
ni.SetAddresses(server.lis.Addr().String())
ni.SetPublicKey(server.key.PublicKey().Bytes())
err := nodes[i].ReadFromV2(ni) // no other way to set address field in netmap.NodeInfo
if err != nil {
return nm, err
}
nodes[i].SetAttribute("id", strconv.Itoa(server.id))
}
nm.SetNodes(nodes)
return nm, nil
}
func (m *mockNetmapSource) PlacementPolicy(context.Context, cid.ID) (netmap.PlacementPolicy, error) {
p := netmap.PlacementPolicy{}
return p, p.DecodeString(m.policy)
}
func (m *mockTreeServer) Serve() {
go m.srv.Serve(m.lis)
}
func (m *mockTreeServer) Stop() {
m.srv.Stop()
}
func (m *mockTreeServer) Addr() string {
return m.lis.Addr().String()
}
func (m *mockTreeServer) Add(context.Context, *tree.AddRequest) (*tree.AddResponse, error) {
m.addCounter++
return &tree.AddResponse{}, nil
}
func (m *mockTreeServer) AddByPath(context.Context, *tree.AddByPathRequest) (*tree.AddByPathResponse, error) {
panic("implement me")
}
func (m *mockTreeServer) Remove(context.Context, *tree.RemoveRequest) (*tree.RemoveResponse, error) {
panic("implement me")
}
func (m *mockTreeServer) Move(context.Context, *tree.MoveRequest) (*tree.MoveResponse, error) {
panic("implement me")
}
func (m *mockTreeServer) GetNodeByPath(context.Context, *tree.GetNodeByPathRequest) (*tree.GetNodeByPathResponse, error) {
panic("implement me")
}
func (m *mockTreeServer) GetSubTree(_ *tree.GetSubTreeRequest, s tree.TreeService_GetSubTreeServer) error {
m.getSubTreeCounter++
if m.getSubTreeError != nil {
return m.getSubTreeError
}
for i := range m.getSubTreeResponses {
if err := s.Send(&tree.GetSubTreeResponse{
Body: m.getSubTreeResponses[i],
}); err != nil {
return err
}
}
return nil
}
func (m *mockTreeServer) TreeList(context.Context, *tree.TreeListRequest) (*tree.TreeListResponse, error) {
panic("implement me")
}
func (m *mockTreeServer) Apply(context.Context, *tree.ApplyRequest) (*tree.ApplyResponse, error) {
panic("implement me")
}
func (m *mockTreeServer) GetOpLog(*tree.GetOpLogRequest, tree.TreeService_GetOpLogServer) error {
panic("implement me")
}
func (m *mockTreeServer) Healthcheck(context.Context, *tree.HealthcheckRequest) (*tree.HealthcheckResponse, error) {
if m.healthy {
return new(tree.HealthcheckResponse), nil
}
return nil, errors.New("not healthy")
}
func createTestServer(t *testing.T, id int) *mockTreeServer {
lis, err := net.Listen("tcp", "127.0.0.1:0")
require.NoError(t, err)
key, err := keys.NewPrivateKey()
require.NoError(t, err)
res := &mockTreeServer{
id: id,
srv: grpc.NewServer(),
lis: lis,
key: key,
healthy: true,
}
tree.RegisterTreeServiceServer(res.srv, res)
return res
}
func preparePoolWithNetmapSource(t *testing.T, n int, p string) (*Pool, []*mockTreeServer, *mockNetmapSource) {
poolInitParams := InitParameters{}
servers := make([]*mockTreeServer, n)
for i := range servers {
servers[i] = createTestServer(t, i)
servers[i].healthy = true
servers[i].Serve()
poolInitParams.AddNode(pool.NewNodeParam(1, servers[i].Addr(), 1))
}
source := &mockNetmapSource{
servers: servers,
policy: p,
}
key, err := keys.NewPrivateKey()
require.NoError(t, err)
poolInitParams.SetKey(key)
poolInitParams.SetNetMapInfoSource(source)
cli, err := NewPool(poolInitParams)
require.NoError(t, err)
return cli, servers, source
}
func sortServers(ctx context.Context, servers []*mockTreeServer, source *mockNetmapSource, cnr cid.ID) ([]*mockTreeServer, error) {
res := make([]*mockTreeServer, len(servers))
snapshot, err := source.NetMapSnapshot(ctx)
if err != nil {
return nil, err
}
policy, err := source.PlacementPolicy(ctx, cnr)
if err != nil {
return nil, err
}
cnrNodes, err := snapshot.ContainerNodes(policy, cnr[:])
if err != nil {
return nil, err
}
priorityNodes, err := snapshot.PlacementVectors(cnrNodes, cnr[:])
if err != nil {
return nil, err
}
// find servers based on public key and store pointers in res
index := 0
for i := range priorityNodes {
for j := range priorityNodes[i] {
key := priorityNodes[i][j].PublicKey()
for k := range servers {
if bytes.Equal(servers[k].key.PublicKey().Bytes(), key) {
res[index] = servers[k]
index++
break
}
}
}
}
return res, nil
}
func TestConnectionLeak(t *testing.T) {
const (
numberOfNodes = 4
placementPolicy = "REP 2"
)
// Initialize gRPC servers and create pool with netmap source
treePool, servers, source := preparePoolWithNetmapSource(t, numberOfNodes, placementPolicy)
for i := range servers {
defer servers[i].Stop()
}
cnr := cidtest.ID()
ctx := context.Background()
// Make priority node for cnr unhealthy, so it is going to be redialled on every request
sortedServers, err := sortServers(ctx, servers, source, cnr)
require.NoError(t, err)
sortedServers[0].healthy = false
// Make RPC and check that pool switched to healthy server
_, err = treePool.AddNode(context.Background(), AddNodeParams{CID: cnr})
require.NoError(t, err)
require.Equal(t, 0, sortedServers[0].addCounter) // unhealthy
require.Equal(t, 1, sortedServers[1].addCounter) // healthy
// Check that go routines are not leaked during multiple requests
routinesBefore := runtime.NumGoroutine()
for i := 0; i < 1000; i++ {
_, err = treePool.AddNode(context.Background(), AddNodeParams{CID: cnr})
require.NoError(t, err)
}
// not more than 1 extra goroutine is created due to async operations
require.LessOrEqual(t, runtime.NumGoroutine()-routinesBefore, 1)
}
func TestStreamRetry(t *testing.T) {
const (
numberOfNodes = 4
placementPolicy = "REP 2"
)
expected := []*tree.GetSubTreeResponse_Body{
{
NodeId: []uint64{1},
},
{
NodeId: []uint64{2},
},
{
NodeId: []uint64{3},
},
}
// Initialize gRPC servers and create pool with netmap source
treePool, servers, source := preparePoolWithNetmapSource(t, numberOfNodes, placementPolicy)
defer func() {
for i := range servers {
servers[i].Stop()
}
}()
cnr := cidtest.ID()
ctx := context.Background()
sortedServers, err := sortServers(ctx, servers, source, cnr)
require.NoError(t, err)
// Return expected response in last priority node, others return error
for i := range sortedServers {
if i == len(sortedServers)-1 {
sortedServers[i].getSubTreeResponses = expected
} else {
sortedServers[i].getSubTreeError = errors.New("tree not found")
}
}
t.Run("read all", func(t *testing.T) {
reader, err := treePool.GetSubTree(ctx, GetSubTreeParams{CID: cnr})
require.NoError(t, err)
data, err := reader.ReadAll()
require.NoError(t, err)
require.Len(t, data, len(expected))
for i := range expected {
require.EqualValues(t, expected[i].GetNodeId(), data[i].GetNodeID())
}
})
t.Run("next", func(t *testing.T) {
reader, err := treePool.GetSubTree(ctx, GetSubTreeParams{CID: cnr})
require.NoError(t, err)
for i := range expected {
resp, err := reader.Next()
require.NoError(t, err)
require.Equal(t, expected[i].GetNodeId(), resp.GetNodeID())
}
_, err = reader.Next()
require.Error(t, io.EOF, err)
})
t.Run("read", func(t *testing.T) {
reader, err := treePool.GetSubTree(ctx, GetSubTreeParams{CID: cnr})
require.NoError(t, err)
buf := make([]*apitree.GetSubTreeResponseBody, len(expected))
_, err = reader.Read(buf)
require.NoError(t, err)
require.Len(t, buf, len(expected))
for i := range expected {
require.EqualValues(t, expected[i].GetNodeId(), buf[i].GetNodeID())
}
})
for i := range servers {
// check we retried every available node in the pool three times
require.Equal(t, 3, servers[i].getSubTreeCounter)
}
}

View file

@ -10,6 +10,7 @@ import (
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
netmaptest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap/test"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool"
"git.frostfs.info/TrueCloudLab/hrw"
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
@ -416,6 +417,21 @@ func TestRetryContainerNodes(t *testing.T) {
})
}
func TestDeleteClientTwice(t *testing.T) {
p := Pool{
clientMap: makeClientMap([]netmap.NodeInfo{netmaptest.NodeInfo()}),
}
// emulate concurrent requests as consecutive requests
// to delete the same client from the map twice
for idToDelete := range p.clientMap {
p.deleteClientFromMap(idToDelete)
require.NotPanics(t, func() {
p.deleteClientFromMap(idToDelete)
})
}
require.Empty(t, p.clientMap)
}
func makeInnerPool(nodes [][]string) []*innerPool {
res := make([]*innerPool, len(nodes))

View file

@ -4,6 +4,7 @@ import (
"bytes"
"errors"
"fmt"
"strings"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/refs"
"github.com/mr-tron/base58"
@ -12,9 +13,8 @@ import (
"github.com/nspcc-dev/neo-go/pkg/util"
)
const idSize = 25
var zeroSlice = bytes.Repeat([]byte{0}, idSize)
// idFullSize is the size of ID in bytes, including prefix and checksum.
const idFullSize = util.Uint160Size + 5
// ID identifies users of the FrostFS system.
//
@ -25,7 +25,7 @@ var zeroSlice = bytes.Repeat([]byte{0}, idSize)
// so it MUST be initialized using some modifying function (e.g. SetScriptHash,
// IDFromKey, etc.).
type ID struct {
w []byte
w util.Uint160
}
// ReadFromV2 reads ID from the refs.OwnerID message. Returns an error if
@ -33,22 +33,7 @@ type ID struct {
//
// See also WriteToV2.
func (x *ID) ReadFromV2(m refs.OwnerID) error {
w := m.GetValue()
if len(w) != idSize {
return fmt.Errorf("invalid length %d, expected %d", len(w), idSize)
}
if w[0] != address.NEO3Prefix {
return fmt.Errorf("invalid prefix byte 0x%X, expected 0x%X", w[0], address.NEO3Prefix)
}
if !bytes.Equal(w[21:], hash.Checksum(w[:21])) {
return errors.New("checksum mismatch")
}
x.w = w
return nil
return x.setUserID(m.GetValue())
}
// WriteToV2 writes ID to the refs.OwnerID message.
@ -56,25 +41,17 @@ func (x *ID) ReadFromV2(m refs.OwnerID) error {
//
// See also ReadFromV2.
func (x ID) WriteToV2(m *refs.OwnerID) {
m.SetValue(x.w)
m.SetValue(x.WalletBytes())
}
// SetScriptHash forms user ID from wallet address scripthash.
func (x *ID) SetScriptHash(scriptHash util.Uint160) {
if cap(x.w) < idSize {
x.w = make([]byte, idSize)
} else if len(x.w) < idSize {
x.w = x.w[:idSize]
}
x.w[0] = address.Prefix
copy(x.w[1:], scriptHash.BytesBE())
copy(x.w[21:], hash.Checksum(x.w[:21]))
x.w = scriptHash
}
// ScriptHash calculates and returns script hash of ID.
func (x *ID) ScriptHash() (util.Uint160, error) {
return util.Uint160DecodeBytesBE(x.w[1:21])
func (x *ID) ScriptHash() util.Uint160 {
return x.w
}
// WalletBytes returns FrostFS user ID as Neo3 wallet address in a binary format.
@ -83,14 +60,18 @@ func (x *ID) ScriptHash() (util.Uint160, error) {
//
// See also Neo3 wallet docs.
func (x ID) WalletBytes() []byte {
return x.w
v := make([]byte, idFullSize)
v[0] = address.Prefix
copy(v[1:], x.w[:])
copy(v[21:], hash.Checksum(v[:21]))
return v
}
// EncodeToString encodes ID into FrostFS API V2 protocol string.
//
// See also DecodeString.
func (x ID) EncodeToString() string {
return base58.Encode(x.w)
return base58.Encode(x.WalletBytes())
}
// DecodeString decodes FrostFS API V2 protocol string. Returns an error
@ -100,14 +81,11 @@ func (x ID) EncodeToString() string {
//
// See also EncodeToString.
func (x *ID) DecodeString(s string) error {
var err error
x.w, err = base58.Decode(s)
w, err := base58.Decode(s)
if err != nil {
return fmt.Errorf("decode base58: %w", err)
}
return nil
return x.setUserID(w)
}
// String implements fmt.Stringer.
@ -121,10 +99,34 @@ func (x ID) String() string {
// Equals defines a comparison relation between two ID instances.
func (x ID) Equals(x2 ID) bool {
return bytes.Equal(x.w, x2.w)
return x.w == x2.w
}
// IsEmpty returns True, if ID is empty value.
func (x ID) IsEmpty() bool {
return bytes.Equal(zeroSlice, x.w)
return x.w == util.Uint160{}
}
func (x *ID) setUserID(w []byte) error {
if len(w) != idFullSize {
return fmt.Errorf("invalid length %d, expected %d", len(w), idFullSize)
}
if w[0] != address.NEO3Prefix {
return fmt.Errorf("invalid prefix byte 0x%X, expected 0x%X", w[0], address.NEO3Prefix)
}
if !bytes.Equal(w[21:], hash.Checksum(w[:21])) {
return errors.New("checksum mismatch")
}
copy(x.w[:], w[1:21])
return nil
}
// Cmp returns an integer comparing two base58 encoded user ID lexicographically.
// The result will be 0 if id1 == id2, -1 if id1 < id2, and +1 if id1 > id2.
func (x ID) Cmp(x2 ID) int {
return strings.Compare(x.EncodeToString(), x2.EncodeToString())
}

View file

@ -3,6 +3,8 @@ package user_test
import (
"bytes"
"crypto/rand"
"slices"
"strings"
"testing"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/refs"
@ -51,8 +53,7 @@ func TestID_SetScriptHash(t *testing.T) {
func TestID_ScriptHash(t *testing.T) {
userID := usertest.ID()
scriptHash, err := userID.ScriptHash()
require.NoError(t, err)
scriptHash := userID.ScriptHash()
ownerAddress := userID.EncodeToString()
decodedScriptHash, err := address.StringToUint160(ownerAddress)
@ -133,3 +134,16 @@ func TestID_Equal(t *testing.T) {
require.True(t, id3.Equals(id1)) // commutativity
require.False(t, id1.Equals(id2))
}
func TestID_Cmp(t *testing.T) {
id1 := usertest.ID()
id2 := usertest.ID()
id3 := usertest.ID()
arr := []ID{id1, id2, id3}
slices.SortFunc(arr, ID.Cmp)
for i := 1; i < len(arr); i++ {
require.NotEqual(t, strings.Compare(arr[i-1].EncodeToString(), arr[i].EncodeToString()), 1, "array is not sorted correctly")
}
}