Compare commits

..

13 commits

Author SHA1 Message Date
6fbe1595cb [#121] pool: Refactor PrmObjectSearch usage
Signed-off-by: Airat Arifullin <a.arifullin@yadro.com>
2023-11-01 17:45:15 +03:00
a9237aabd2 [#121] client: Make PrmObjectSearch fields public
Signed-off-by: Airat Arifullin <a.arifullin@yadro.com>
2023-11-01 17:45:06 +03:00
a487033505 [#121] client: Nuke out unused prmCommonMeta
Signed-off-by: Airat Arifullin a.arifullin@yadro.com
2023-10-31 22:55:25 +03:00
51c3618850 [#121] client: Make PrmContainerList fields public
Signed-off-by: Airat Arifullin a.arifullin@yadro.com
2023-10-31 22:55:25 +03:00
665e5807bc [#188] transformer: Allow to provide size hint
For big objects with known size we can optimize allocation patterns
by providing size hint. As with any hint, it does not affect transformer
functionality: slices with capacity > MaxSize are never allocated.

```
goos: linux
goarch: amd64
pkg: git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/transformer
cpu: 11th Gen Intel(R) Core(TM) i5-1135G7 @ 2.40GHz
                                                │     out     │
                                                │   sec/op    │
Transformer/small/no_size_hint-8                  65.44µ ± 3%
Transformer/small/no_size_hint,_with_buffer-8     64.24µ ± 5%
Transformer/small/with_size_hint,_with_buffer-8   58.70µ ± 5%
Transformer/big/no_size_hint-8                    367.8m ± 3%
Transformer/big/no_size_hint,_with_buffer-8       562.7m ± 0%
Transformer/big/with_size_hint,_with_buffer-8     385.6m ± 7%
geomean                                           5.197m

                                                │     out      │
                                                │     B/op     │
Transformer/small/no_size_hint-8                  13.40Ki ± 0%
Transformer/small/no_size_hint,_with_buffer-8     13.40Ki ± 0%
Transformer/small/with_size_hint,_with_buffer-8   13.39Ki ± 0%
Transformer/big/no_size_hint-8                    288.0Mi ± 0%
Transformer/big/no_size_hint,_with_buffer-8       1.390Gi ± 0%
Transformer/big/with_size_hint,_with_buffer-8     288.0Mi ± 0%
geomean                                           2.533Mi

                                                │    out     │
                                                │ allocs/op  │
Transformer/small/no_size_hint-8                  92.00 ± 0%
Transformer/small/no_size_hint,_with_buffer-8     92.00 ± 0%
Transformer/small/with_size_hint,_with_buffer-8   92.00 ± 0%
Transformer/big/no_size_hint-8                    546.5 ± 0%
Transformer/big/no_size_hint,_with_buffer-8       607.5 ± 0%
Transformer/big/with_size_hint,_with_buffer-8     545.5 ± 0%
geomean                                           228.1
```

Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2023-10-30 12:13:04 +00:00
a02c0bfac8 [#186] netmap: Marshal policy with brackets
Brackets can be semantically important and must not be omitted,
otherwise the output is plain wrong.
We do not take the responsibility to preserve every bracket, though,
because parser does some optimizations related to grouping long chains
of filters combined with the same operation.

Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2023-10-27 10:54:45 +03:00
20d325e307 [#167] netmap: Fix reverse min agregator
The higher the price, the lower reverse min weight should be.
Previously nodes with 0 price had 0 weight which is a bit misleading.

Introduced in d71a0e0755.

Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2023-10-27 07:53:19 +00:00
670619d242 [#131] client: keep backwards-compatibility, update README.md, fix chore
Signed-off-by: Egor Olefirenko <egor.olefirenko892@gmail.com>
2023-10-26 14:35:49 +00:00
0d79d10482 [#131] client: rename option consistently and fix test
Signed-off-by: Egor Olefirenko <egor.olefirenko892@gmail.com>
2023-10-26 14:35:49 +00:00
9727beb47d [#131] client: Switch ResolveFrostFSFailures to DontResolveFrostFSFailures option
Signed-off-by: Egor Olefirenko <egor.olefirenko892@gmail.com>
2023-10-26 14:35:49 +00:00
84315fab6a [#121] client: Make PrmBalanceGet fields public
Signed-off-by: Airat Arifullin <a.arifullin@yadro.com>
2023-10-23 18:53:14 +03:00
71335489ae [#183] forgejo: Make linter great again
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
2023-10-23 18:13:40 +03:00
4c1feaf2cb [#182] pool: Fix linter error about deprecated methods
Signed-off-by: Airat Arifullin <a.arifullin@yadro.com>
2023-10-23 17:40:00 +03:00
35 changed files with 463 additions and 325 deletions

View file

@ -8,17 +8,24 @@ jobs:
steps: steps:
- uses: actions/checkout@v3 - uses: actions/checkout@v3
- name: golangci-lint - name: Set up Go
uses: https://github.com/golangci/golangci-lint-action@v2 uses: actions/setup-go@v3
with: with:
version: latest go-version: '1.21'
cache: true
- name: Install linters
run: make lint-install
- name: Run linters
run: make lint
tests: tests:
name: Tests name: Tests
runs-on: ubuntu-latest runs-on: ubuntu-latest
strategy: strategy:
matrix: matrix:
go_versions: [ '1.19', '1.20' ] go_versions: [ '1.20', '1.21' ]
fail-fast: false fail-fast: false
steps: steps:
- uses: actions/checkout@v3 - uses: actions/checkout@v3

4
.gitignore vendored
View file

@ -27,3 +27,7 @@ antlr-*.jar
# tempfiles # tempfiles
.cache .cache
# binary
bin/
release/

View file

@ -1,6 +1,11 @@
#!/usr/bin/make -f #!/usr/bin/make -f
ANTLR_VERSION="4.13.0" ANTLR_VERSION="4.13.0"
TMP_DIR := .cache
LINT_VERSION ?= 1.55.0
TRUECLOUDLAB_LINT_VERSION ?= 0.0.2
OUTPUT_LINT_DIR ?= $(shell pwd)/bin
LINT_DIR = $(OUTPUT_LINT_DIR)/golangci-lint-$(LINT_VERSION)-v$(TRUECLOUDLAB_LINT_VERSION)
# Run tests # Run tests
test: test:
@ -15,9 +20,23 @@ dep:
@CGO_ENABLED=0 \ @CGO_ENABLED=0 \
go mod tidy -v && echo OK go mod tidy -v && echo OK
# Install linters
lint-install:
@mkdir -p $(TMP_DIR)
@rm -rf $(TMP_DIR)/linters
@git -c advice.detachedHead=false clone --branch v$(TRUECLOUDLAB_LINT_VERSION) https://git.frostfs.info/TrueCloudLab/linters.git $(TMP_DIR)/linters
@@make -C $(TMP_DIR)/linters lib CGO_ENABLED=1 OUT_DIR=$(OUTPUT_LINT_DIR)
@rm -rf $(TMP_DIR)/linters
@rmdir $(TMP_DIR) 2>/dev/null || true
@CGO_ENABLED=1 GOBIN=$(LINT_DIR) go install github.com/golangci/golangci-lint/cmd/golangci-lint@v$(LINT_VERSION)
# Run linters # Run linters
lint: lint:
@golangci-lint --timeout=5m run @if [ ! -d "$(LINT_DIR)" ]; then \
echo "Run make lint-install"; \
exit 1; \
fi
$(LINT_DIR)/golangci-lint run
# Run tests with race detection and produce coverage output # Run tests with race detection and produce coverage output
cover: cover:

View file

@ -42,7 +42,6 @@ Contains client for working with FrostFS.
```go ```go
var prmInit client.PrmInit var prmInit client.PrmInit
prmInit.SetDefaultPrivateKey(key) // private key for request signing prmInit.SetDefaultPrivateKey(key) // private key for request signing
prmInit.ResolveFrostFSFailures() // enable erroneous status parsing
var c client.Client var c client.Client
c.Init(prmInit) c.Init(prmInit)
@ -77,8 +76,7 @@ if needed and perform any desired action. In the case above we may want to repor
these details to the user as well as retry an operation, possibly with different parameters. these details to the user as well as retry an operation, possibly with different parameters.
Status wire-format is extendable and each node can report any set of details it wants. Status wire-format is extendable and each node can report any set of details it wants.
The set of reserved status codes can be found in The set of reserved status codes can be found in
[FrostFS API](https://git.frostfs.info/TrueCloudLab/frostfs-api/src/branch/master/status/types.proto). There is also [FrostFS API](https://git.frostfs.info/TrueCloudLab/frostfs-api/src/branch/master/status/types.proto).
a `client.PrmInit.ResolveFrostFSFailures()` to seamlessly convert erroneous statuses into Go error type.
### policy ### policy
Contains helpers allowing conversion of placing policy from/to JSON representation Contains helpers allowing conversion of placing policy from/to JSON representation

View file

@ -17,26 +17,26 @@ import (
// PrmBalanceGet groups parameters of BalanceGet operation. // PrmBalanceGet groups parameters of BalanceGet operation.
type PrmBalanceGet struct { type PrmBalanceGet struct {
prmCommonMeta XHeaders []string
accountSet bool Account *user.ID
account user.ID
} }
// SetAccount sets identifier of the FrostFS account for which the balance is requested. // SetAccount sets identifier of the FrostFS account for which the balance is requested.
// Required parameter. // Required parameter.
//
// Deprecated: Use PrmBalanceGet.Account instead.
func (x *PrmBalanceGet) SetAccount(id user.ID) { func (x *PrmBalanceGet) SetAccount(id user.ID) {
x.account = id x.Account = &id
x.accountSet = true
} }
func (x *PrmBalanceGet) buildRequest(c *Client) (*v2accounting.BalanceRequest, error) { func (x *PrmBalanceGet) buildRequest(c *Client) (*v2accounting.BalanceRequest, error) {
if !x.accountSet { if x.Account == nil {
return nil, errorAccountNotSet return nil, errorAccountNotSet
} }
var accountV2 refs.OwnerID var accountV2 refs.OwnerID
x.account.WriteToV2(&accountV2) x.Account.WriteToV2(&accountV2)
var body v2accounting.BalanceRequestBody var body v2accounting.BalanceRequestBody
body.SetOwnerID(&accountV2) body.SetOwnerID(&accountV2)
@ -64,9 +64,9 @@ func (x ResBalanceGet) Amount() accounting.Decimal {
// //
// Exactly one return value is non-nil. By default, server status is returned in res structure. // Exactly one return value is non-nil. By default, server status is returned in res structure.
// Any client's internal or transport errors are returned as `error`, // Any client's internal or transport errors are returned as `error`,
// If PrmInit.ResolveFrostFSFailures has been called, unsuccessful // If PrmInit.DisableFrostFSFailuresResolution has been called, unsuccessful
// FrostFS status codes are returned as `error`, otherwise, are included // FrostFS status codes are included in the returned result structure,
// in the returned result structure. // otherwise, are also returned as `error`.
// //
// Returns an error if parameters are set incorrectly (see PrmBalanceGet docs). // Returns an error if parameters are set incorrectly (see PrmBalanceGet docs).
// Context is required and must not be nil. It is used for network communication. // Context is required and must not be nil. It is used for network communication.

View file

@ -144,7 +144,7 @@ func (c *Client) Close() error {
// //
// See also Init. // See also Init.
type PrmInit struct { type PrmInit struct {
resolveFrostFSErrors bool disableFrostFSErrorResolution bool
key ecdsa.PrivateKey key ecdsa.PrivateKey
@ -161,12 +161,16 @@ func (x *PrmInit) SetDefaultPrivateKey(key ecdsa.PrivateKey) {
x.key = key x.key = key
} }
// ResolveFrostFSFailures makes the Client to resolve failure statuses of the // Deprecated: method is no-op. Option is default.
// FrostFS protocol into Go built-in errors. These errors are returned from
// each protocol operation. By default, statuses aren't resolved and written
// to the resulting structure (see corresponding Res* docs).
func (x *PrmInit) ResolveFrostFSFailures() { func (x *PrmInit) ResolveFrostFSFailures() {
x.resolveFrostFSErrors = true }
// DisableFrostFSFailuresResolution makes the Client to preserve failure statuses of the
// FrostFS protocol only in resulting structure (see corresponding Res* docs).
// These errors are returned from each protocol operation. By default, statuses
// are resolved and returned as a Go built-in errors.
func (x *PrmInit) DisableFrostFSFailuresResolution() {
x.disableFrostFSErrorResolution = true
} }
// SetResponseInfoCallback makes the Client to pass ResponseMetaInfo from each // SetResponseInfoCallback makes the Client to pass ResponseMetaInfo from each

View file

@ -24,24 +24,6 @@ func (x statusRes) Status() apistatus.Status {
return x.st return x.st
} }
// groups meta parameters shared between all Client operations.
type prmCommonMeta struct {
// FrostFS request X-Headers
xHeaders []string
}
// WithXHeaders specifies list of extended headers (string key-value pairs)
// to be attached to the request. Must have an even length.
//
// Slice must not be mutated until the operation completes.
func (x *prmCommonMeta) WithXHeaders(hs ...string) {
if len(hs)%2 != 0 {
panic("slice of X-Headers with odd length")
}
x.xHeaders = hs
}
func writeXHeadersToMeta(xHeaders []string, h *v2session.RequestMetaHeader) { func writeXHeadersToMeta(xHeaders []string, h *v2session.RequestMetaHeader) {
if len(xHeaders) == 0 { if len(xHeaders) == 0 {
return return
@ -119,7 +101,7 @@ func (c *Client) processResponse(resp responseV2) (apistatus.Status, error) {
} }
st := apistatus.FromStatusV2(resp.GetMetaHeader().GetStatus()) st := apistatus.FromStatusV2(resp.GetMetaHeader().GetStatus())
if c.prm.resolveFrostFSErrors { if !c.prm.disableFrostFSErrorResolution {
return st, apistatus.ErrFromStatus(st) return st, apistatus.ErrFromStatus(st)
} }
return st, nil return st, nil

View file

@ -101,9 +101,9 @@ type ResContainerDelete struct {
// //
// Exactly one return value is non-nil. By default, server status is returned in res structure. // Exactly one return value is non-nil. By default, server status is returned in res structure.
// Any client's internal or transport errors are returned as `error`. // Any client's internal or transport errors are returned as `error`.
// If PrmInit.ResolveFrostFSFailures has been called, unsuccessful // If PrmInit.DisableFrostFSFailuresResolution has been called, unsuccessful
// FrostFS status codes are returned as `error`, otherwise, are included // FrostFS status codes are included in the returned result structure,
// in the returned result structure. // otherwise, are also returned as `error`.
// //
// Operation is asynchronous and no guaranteed even in the absence of errors. // Operation is asynchronous and no guaranteed even in the absence of errors.
// The required time is also not predictable. // The required time is also not predictable.

View file

@ -68,9 +68,9 @@ func (x ResContainerEACL) Table() eacl.Table {
// //
// Exactly one return value is non-nil. By default, server status is returned in res structure. // Exactly one return value is non-nil. By default, server status is returned in res structure.
// Any client's internal or transport errors are returned as `error`. // Any client's internal or transport errors are returned as `error`.
// If PrmInit.ResolveFrostFSFailures has been called, unsuccessful // If PrmInit.DisableFrostFSFailuresResolution has been called, unsuccessful
// FrostFS status codes are returned as `error`, otherwise, are included // FrostFS status codes are included in the returned result structure,
// in the returned result structure. // otherwise, are also returned as `error`.
// //
// Returns an error if parameters are set incorrectly (see PrmContainerEACL docs). // Returns an error if parameters are set incorrectly (see PrmContainerEACL docs).
// Context is required and must not be nil. It is used for network communication. // Context is required and must not be nil. It is used for network communication.

View file

@ -71,9 +71,9 @@ func (x ResContainerGet) Container() container.Container {
// //
// Exactly one return value is non-nil. By default, server status is returned in res structure. // Exactly one return value is non-nil. By default, server status is returned in res structure.
// Any client's internal or transport errors are returned as `error`. // Any client's internal or transport errors are returned as `error`.
// If PrmInit.ResolveFrostFSFailures has been called, unsuccessful // If PrmInit.DisableFrostFSFailuresResolution has been called, unsuccessful
// FrostFS status codes are returned as `error`, otherwise, are included // FrostFS status codes are included in the returned result structure,
// in the returned result structure. // otherwise, are also returned as `error`.
// //
// Returns an error if parameters are set incorrectly (see PrmContainerGet docs). // Returns an error if parameters are set incorrectly (see PrmContainerGet docs).
// Context is required and must not be nil. It is used for network communication. // Context is required and must not be nil. It is used for network communication.

View file

@ -17,26 +17,26 @@ import (
// PrmContainerList groups parameters of ContainerList operation. // PrmContainerList groups parameters of ContainerList operation.
type PrmContainerList struct { type PrmContainerList struct {
prmCommonMeta XHeaders []string
ownerSet bool Account *user.ID
ownerID user.ID
} }
// SetAccount sets identifier of the FrostFS account to list the containers. // SetAccount sets identifier of the FrostFS account to list the containers.
// Required parameter. // Required parameter.
//
// Deprecated: Use PrmContainerList.Account instead.
func (x *PrmContainerList) SetAccount(id user.ID) { func (x *PrmContainerList) SetAccount(id user.ID) {
x.ownerID = id x.Account = &id
x.ownerSet = true
} }
func (x *PrmContainerList) buildRequest(c *Client) (*v2container.ListRequest, error) { func (x *PrmContainerList) buildRequest(c *Client) (*v2container.ListRequest, error) {
if !x.ownerSet { if x.Account == nil {
return nil, errorAccountNotSet return nil, errorAccountNotSet
} }
var ownerV2 refs.OwnerID var ownerV2 refs.OwnerID
x.ownerID.WriteToV2(&ownerV2) x.Account.WriteToV2(&ownerV2)
reqBody := new(v2container.ListRequestBody) reqBody := new(v2container.ListRequestBody)
reqBody.SetOwnerID(&ownerV2) reqBody.SetOwnerID(&ownerV2)
@ -65,9 +65,9 @@ func (x ResContainerList) Containers() []cid.ID {
// //
// Exactly one return value is non-nil. By default, server status is returned in res structure. // Exactly one return value is non-nil. By default, server status is returned in res structure.
// Any client's internal or transport errors are returned as `error`. // Any client's internal or transport errors are returned as `error`.
// If PrmInit.ResolveFrostFSFailures has been called, unsuccessful // If PrmInit.DisableFrostFSFailuresResolution has been called, unsuccessful
// FrostFS status codes are returned as `error`, otherwise, are included // FrostFS status codes are included in the returned result structure,
// in the returned result structure. // otherwise, are also returned as `error`.
// //
// Returns an error if parameters are set incorrectly (see PrmContainerList docs). // Returns an error if parameters are set incorrectly (see PrmContainerList docs).
// Context is required and must not be nil. It is used for network communication. // Context is required and must not be nil. It is used for network communication.

View file

@ -110,9 +110,9 @@ func (x ResContainerPut) ID() cid.ID {
// //
// Exactly one return value is non-nil. By default, server status is returned in res structure. // Exactly one return value is non-nil. By default, server status is returned in res structure.
// Any client's internal or transport errors are returned as `error`. // Any client's internal or transport errors are returned as `error`.
// If PrmInit.ResolveFrostFSFailures has been called, unsuccessful // If PrmInit.DisableFrostFSFailuresResolution has been called, unsuccessful
// FrostFS status codes are returned as `error`, otherwise, are included // FrostFS status codes are included in the returned result structure,
// in the returned result structure. // otherwise, are also returned as `error`.
// //
// Operation is asynchronous and no guaranteed even in the absence of errors. // Operation is asynchronous and no guaranteed even in the absence of errors.
// The required time is also not predictable. // The required time is also not predictable.

View file

@ -101,9 +101,9 @@ type ResContainerSetEACL struct {
// //
// Exactly one return value is non-nil. By default, server status is returned in res structure. // Exactly one return value is non-nil. By default, server status is returned in res structure.
// Any client's internal or transport errors are returned as `error`. // Any client's internal or transport errors are returned as `error`.
// If PrmInit.ResolveFrostFSFailures has been called, unsuccessful // If PrmInit.DisableFrostFSFailuresResolution has been called, unsuccessful
// FrostFS status codes are returned as `error`, otherwise, are included // FrostFS status codes are included in the returned result structure,
// in the returned result structure. // otherwise, are also returned as `error`.
// //
// Operation is asynchronous and no guaranteed even in the absence of errors. // Operation is asynchronous and no guaranteed even in the absence of errors.
// The required time is also not predictable. // The required time is also not predictable.

View file

@ -57,9 +57,9 @@ type ResAnnounceSpace struct {
// //
// Exactly one return value is non-nil. By default, server status is returned in res structure. // Exactly one return value is non-nil. By default, server status is returned in res structure.
// Any client's internal or transport errors are returned as `error`. // Any client's internal or transport errors are returned as `error`.
// If PrmInit.ResolveFrostFSFailures has been called, unsuccessful // If PrmInit.DisableFrostFSFailuresResolution has been called, unsuccessful
// FrostFS status codes are returned as `error`, otherwise, are included // FrostFS status codes are included in the returned result structure,
// in the returned result structure. // otherwise, are also returned as `error`.
// //
// Operation is asynchronous and no guaranteed even in the absence of errors. // Operation is asynchronous and no guaranteed even in the absence of errors.
// The required time is also not predictable. // The required time is also not predictable.

View file

@ -53,9 +53,9 @@ func (x ResEndpointInfo) NodeInfo() netmap.NodeInfo {
// Method can be used as a health check to see if node is alive and responds to requests. // Method can be used as a health check to see if node is alive and responds to requests.
// //
// Any client's internal or transport errors are returned as `error`. // Any client's internal or transport errors are returned as `error`.
// If PrmInit.ResolveFrostFSFailures has been called, unsuccessful // If PrmInit.DisableFrostFSFailuresResolution has been called, unsuccessful
// FrostFS status codes are returned as `error`, otherwise, are included // FrostFS status codes are included in the returned result structure,
// in the returned result structure. // otherwise, are also returned as `error`.
// //
// Returns an error if parameters are set incorrectly (see PrmEndpointInfo docs). // Returns an error if parameters are set incorrectly (see PrmEndpointInfo docs).
// Context is required and must not be nil. It is used for network communication. // Context is required and must not be nil. It is used for network communication.
@ -140,9 +140,9 @@ func (x ResNetworkInfo) Info() netmap.NetworkInfo {
// NetworkInfo requests information about the FrostFS network of which the remote server is a part. // NetworkInfo requests information about the FrostFS network of which the remote server is a part.
// //
// Any client's internal or transport errors are returned as `error`. // Any client's internal or transport errors are returned as `error`.
// If PrmInit.ResolveFrostFSFailures has been called, unsuccessful // If PrmInit.DisableFrostFSFailuresResolution has been called, unsuccessful
// FrostFS status codes are returned as `error`, otherwise, are included // FrostFS status codes are included in the returned result structure,
// in the returned result structure. // otherwise, are also returned as `error`.
// //
// Returns an error if parameters are set incorrectly (see PrmNetworkInfo docs). // Returns an error if parameters are set incorrectly (see PrmNetworkInfo docs).
// Context is required and must not be nil. It is used for network communication. // Context is required and must not be nil. It is used for network communication.
@ -204,9 +204,9 @@ func (x ResNetMapSnapshot) NetMap() netmap.NetMap {
// NetMapSnapshot requests current network view of the remote server. // NetMapSnapshot requests current network view of the remote server.
// //
// Any client's internal or transport errors are returned as `error`. // Any client's internal or transport errors are returned as `error`.
// If PrmInit.ResolveFrostFSFailures has been called, unsuccessful // If PrmInit.DisableFrostFSFailuresResolution has been called, unsuccessful
// FrostFS status codes are returned as `error`, otherwise, are included // FrostFS status codes are included in the returned result structure,
// in the returned result structure. // otherwise, are also returned as `error`.
// //
// Returns an error if parameters are set incorrectly. // Returns an error if parameters are set incorrectly.
// Context is required and MUST NOT be nil. It is used for network communication. // Context is required and MUST NOT be nil. It is used for network communication.

View file

@ -67,6 +67,7 @@ func TestClient_NetMapSnapshot(t *testing.T) {
var res *ResNetMapSnapshot var res *ResNetMapSnapshot
var srv serverNetMap var srv serverNetMap
c := newClient(&srv) c := newClient(&srv)
c.prm.DisableFrostFSFailuresResolution()
ctx := context.Background() ctx := context.Background()
// request signature // request signature

View file

@ -112,9 +112,9 @@ func (prm *PrmObjectDelete) buildRequest(c *Client) (*v2object.DeleteRequest, er
// //
// Exactly one return value is non-nil. By default, server status is returned in res structure. // Exactly one return value is non-nil. By default, server status is returned in res structure.
// Any client's internal or transport errors are returned as `error`, // Any client's internal or transport errors are returned as `error`,
// If PrmInit.ResolveFrostFSFailures has been called, unsuccessful // If PrmInit.DisableFrostFSFailuresResolution has been called, unsuccessful
// FrostFS status codes are returned as `error`, otherwise, are included // FrostFS status codes are included in the returned result structure,
// in the returned result structure. // otherwise, are also returned as `error`.
// //
// Returns an error if parameters are set incorrectly (see PrmObjectDelete docs). // Returns an error if parameters are set incorrectly (see PrmObjectDelete docs).
// Context is required and must not be nil. It is used for network communication. // Context is required and must not be nil. It is used for network communication.

View file

@ -444,9 +444,9 @@ func (prm *PrmObjectHead) buildRequest(c *Client) (*v2object.HeadRequest, error)
// //
// Exactly one return value is non-nil. By default, server status is returned in res structure. // Exactly one return value is non-nil. By default, server status is returned in res structure.
// Any client's internal or transport errors are returned as `error`, // Any client's internal or transport errors are returned as `error`,
// If PrmInit.ResolveFrostFSFailures has been called, unsuccessful // If PrmInit.DisableFrostFSFailuresResolution has been called, unsuccessful
// FrostFS status codes are returned as `error`, otherwise, are included // FrostFS status codes are included in the returned result structure,
// in the returned result structure. // otherwise, are also returned as `error`.
// //
// Returns an error if parameters are set incorrectly (see PrmObjectHead docs). // Returns an error if parameters are set incorrectly (see PrmObjectHead docs).
// Context is required and must not be nil. It is used for network communication. // Context is required and must not be nil. It is used for network communication.

View file

@ -152,9 +152,9 @@ func (prm *PrmObjectHash) buildRequest(c *Client) (*v2object.GetRangeHashRequest
// //
// Exactly one return value is non-nil. By default, server status is returned in res structure. // Exactly one return value is non-nil. By default, server status is returned in res structure.
// Any client's internal or transport errors are returned as `error`, // Any client's internal or transport errors are returned as `error`,
// If PrmInit.ResolveFrostFSFailures has been called, unsuccessful // If PrmInit.DisableFrostFSFailuresResolution has been called, unsuccessful
// FrostFS status codes are returned as `error`, otherwise, are included // FrostFS status codes are included in the returned result structure,
// in the returned result structure. // otherwise, are also returned as `error`.
// //
// Returns an error if parameters are set incorrectly (see PrmObjectHash docs). // Returns an error if parameters are set incorrectly (see PrmObjectHash docs).
// Context is required and must not be nil. It is used for network communication. // Context is required and must not be nil. It is used for network communication.

View file

@ -83,7 +83,7 @@ func (it *internalTarget) putAsStream(ctx context.Context, o *object.Object) err
wrt.WritePayloadChunk(ctx, o.Payload()) wrt.WritePayloadChunk(ctx, o.Payload())
} }
it.res, err = wrt.Close(ctx) it.res, err = wrt.Close(ctx)
if err == nil && !it.client.prm.resolveFrostFSErrors && !apistatus.IsSuccessful(it.res.st) { if err == nil && it.client.prm.disableFrostFSErrorResolution && !apistatus.IsSuccessful(it.res.st) {
err = apistatus.ErrFromStatus(it.res.st) err = apistatus.ErrFromStatus(it.res.st)
} }
return err return err
@ -115,7 +115,7 @@ func (it *internalTarget) tryPutSingle(ctx context.Context, o *object.Object) (b
statusRes: res.statusRes, statusRes: res.statusRes,
obj: id, obj: id,
} }
if !it.client.prm.resolveFrostFSErrors && !apistatus.IsSuccessful(it.res.st) { if it.client.prm.disableFrostFSErrorResolution && !apistatus.IsSuccessful(it.res.st) {
return true, apistatus.ErrFromStatus(it.res.st) return true, apistatus.ErrFromStatus(it.res.st)
} }
return true, nil return true, nil

View file

@ -24,19 +24,26 @@ import (
// PrmObjectSearch groups parameters of ObjectSearch operation. // PrmObjectSearch groups parameters of ObjectSearch operation.
type PrmObjectSearch struct { type PrmObjectSearch struct {
meta v2session.RequestMetaHeader XHeaders []string
key *ecdsa.PrivateKey Local bool
cnrSet bool BearerToken *bearer.Token
cnrID cid.ID
filters object.SearchFilters Session *session.Object
ContainerID *cid.ID
Key *ecdsa.PrivateKey
Filters object.SearchFilters
} }
// MarkLocal tells the server to execute the operation locally. // MarkLocal tells the server to execute the operation locally.
//
// Deprecated: Use PrmObjectSearch.Local instead.
func (x *PrmObjectSearch) MarkLocal() { func (x *PrmObjectSearch) MarkLocal() {
x.meta.SetTTL(1) x.Local = true
} }
// WithinSession specifies session within which the search query must be executed. // WithinSession specifies session within which the search query must be executed.
@ -45,10 +52,10 @@ func (x *PrmObjectSearch) MarkLocal() {
// This may affect the execution of an operation (e.g. access control). // This may affect the execution of an operation (e.g. access control).
// //
// Must be signed. // Must be signed.
//
// Deprecated: Use PrmObjectSearch.Session instead.
func (x *PrmObjectSearch) WithinSession(t session.Object) { func (x *PrmObjectSearch) WithinSession(t session.Object) {
var tokv2 v2session.Token x.Session = &t
t.WriteToV2(&tokv2)
x.meta.SetSessionToken(&tokv2)
} }
// WithBearerToken attaches bearer token to be used for the operation. // WithBearerToken attaches bearer token to be used for the operation.
@ -56,37 +63,44 @@ func (x *PrmObjectSearch) WithinSession(t session.Object) {
// If set, underlying eACL rules will be used in access control. // If set, underlying eACL rules will be used in access control.
// //
// Must be signed. // Must be signed.
//
// Deprecated: Use PrmObjectSearch.BearerToken instead.
func (x *PrmObjectSearch) WithBearerToken(t bearer.Token) { func (x *PrmObjectSearch) WithBearerToken(t bearer.Token) {
var v2token acl.BearerToken x.BearerToken = &t
t.WriteToV2(&v2token)
x.meta.SetBearerToken(&v2token)
} }
// WithXHeaders specifies list of extended headers (string key-value pairs) // WithXHeaders specifies list of extended headers (string key-value pairs)
// to be attached to the request. Must have an even length. // to be attached to the request. Must have an even length.
// //
// Slice must not be mutated until the operation completes. // Slice must not be mutated until the operation completes.
//
// Deprecated: Use PrmObjectSearch.XHeaders instead.
func (x *PrmObjectSearch) WithXHeaders(hs ...string) { func (x *PrmObjectSearch) WithXHeaders(hs ...string) {
writeXHeadersToMeta(hs, &x.meta) x.XHeaders = hs
} }
// UseKey specifies private key to sign the requests. // UseKey specifies private key to sign the requests.
// If key is not provided, then Client default key is used. // If key is not provided, then Client default key is used.
//
// Deprecated: Use PrmObjectSearch.Key instead.
func (x *PrmObjectSearch) UseKey(key ecdsa.PrivateKey) { func (x *PrmObjectSearch) UseKey(key ecdsa.PrivateKey) {
x.key = &key x.Key = &key
} }
// InContainer specifies the container in which to look for objects. // InContainer specifies the container in which to look for objects.
// Required parameter. // Required parameter.
//
// Deprecated: Use PrmObjectSearch.ContainerID instead.
func (x *PrmObjectSearch) InContainer(id cid.ID) { func (x *PrmObjectSearch) InContainer(id cid.ID) {
x.cnrID = id x.ContainerID = &id
x.cnrSet = true
} }
// SetFilters sets filters by which to select objects. All container objects // SetFilters sets filters by which to select objects. All container objects
// match unset/empty filters. // match unset/empty filters.
//
// Deprecated: Use PrmObjectSearch.Filters instead.
func (x *PrmObjectSearch) SetFilters(filters object.SearchFilters) { func (x *PrmObjectSearch) SetFilters(filters object.SearchFilters) {
x.filters = filters x.Filters = filters
} }
// ResObjectSearch groups the final result values of ObjectSearch operation. // ResObjectSearch groups the final result values of ObjectSearch operation.
@ -212,6 +226,48 @@ func (x *ObjectListReader) Close() (*ResObjectSearch, error) {
return &x.res, nil return &x.res, nil
} }
func (x *PrmObjectSearch) buildRequest(c *Client) (*v2object.SearchRequest, error) {
if x.ContainerID == nil {
return nil, errorMissingContainer
}
if len(x.XHeaders)%2 != 0 {
return nil, errorInvalidXHeaders
}
meta := new(v2session.RequestMetaHeader)
writeXHeadersToMeta(x.XHeaders, meta)
if x.BearerToken != nil {
v2BearerToken := new(acl.BearerToken)
x.BearerToken.WriteToV2(v2BearerToken)
meta.SetBearerToken(v2BearerToken)
}
if x.Session != nil {
v2SessionToken := new(v2session.Token)
x.Session.WriteToV2(v2SessionToken)
meta.SetSessionToken(v2SessionToken)
}
if x.Local {
meta.SetTTL(1)
}
cnrV2 := new(v2refs.ContainerID)
x.ContainerID.WriteToV2(cnrV2)
body := new(v2object.SearchRequestBody)
body.SetVersion(1)
body.SetContainerID(cnrV2)
body.SetFilters(x.Filters.ToV2())
req := new(v2object.SearchRequest)
req.SetBody(body)
c.prepareRequest(req, meta)
return req, nil
}
// ObjectSearchInit initiates object selection through a remote server using FrostFS API protocol. // ObjectSearchInit initiates object selection through a remote server using FrostFS API protocol.
// //
// The call only opens the transmission channel, explicit fetching of matched objects // The call only opens the transmission channel, explicit fetching of matched objects
@ -221,30 +277,17 @@ func (x *ObjectListReader) Close() (*ResObjectSearch, error) {
// Returns an error if parameters are set incorrectly (see PrmObjectSearch docs). // Returns an error if parameters are set incorrectly (see PrmObjectSearch docs).
// Context is required and must not be nil. It is used for network communication. // Context is required and must not be nil. It is used for network communication.
func (c *Client) ObjectSearchInit(ctx context.Context, prm PrmObjectSearch) (*ObjectListReader, error) { func (c *Client) ObjectSearchInit(ctx context.Context, prm PrmObjectSearch) (*ObjectListReader, error) {
// check parameters req, err := prm.buildRequest(c)
if !prm.cnrSet { if err != nil {
return nil, errorMissingContainer return nil, err
} }
var cidV2 v2refs.ContainerID key := prm.Key
prm.cnrID.WriteToV2(&cidV2)
var body v2object.SearchRequestBody
body.SetVersion(1)
body.SetContainerID(&cidV2)
body.SetFilters(prm.filters.ToV2())
// init reader
var req v2object.SearchRequest
req.SetBody(&body)
c.prepareRequest(&req, &prm.meta)
key := prm.key
if key == nil { if key == nil {
key = &c.prm.key key = &c.prm.key
} }
err := signature.SignServiceMessage(key, &req) err = signature.SignServiceMessage(key, req)
if err != nil { if err != nil {
return nil, fmt.Errorf("sign request: %w", err) return nil, fmt.Errorf("sign request: %w", err)
} }
@ -252,7 +295,7 @@ func (c *Client) ObjectSearchInit(ctx context.Context, prm PrmObjectSearch) (*Ob
var r ObjectListReader var r ObjectListReader
ctx, r.cancelCtxStream = context.WithCancel(ctx) ctx, r.cancelCtxStream = context.WithCancel(ctx)
r.stream, err = rpcapi.SearchObjects(&c.c, &req, client.WithContext(ctx)) r.stream, err = rpcapi.SearchObjects(&c.c, req, client.WithContext(ctx))
if err != nil { if err != nil {
return nil, fmt.Errorf("open stream: %w", err) return nil, fmt.Errorf("open stream: %w", err)
} }

View file

@ -89,9 +89,9 @@ func (x ResSessionCreate) PublicKey() []byte {
// //
// Exactly one return value is non-nil. By default, server status is returned in res structure. // Exactly one return value is non-nil. By default, server status is returned in res structure.
// Any client's internal or transport errors are returned as `error`. // Any client's internal or transport errors are returned as `error`.
// If PrmInit.ResolveFrostFSFailures has been called, unsuccessful // If PrmInit.DisableFrostFSFailuresResolution has been called, unsuccessful
// FrostFS status codes are returned as `error`, otherwise, are included // FrostFS status codes are included in the returned result structure,
// in the returned result structure. // otherwise, are also returned as `error`.
// //
// Returns an error if parameters are set incorrectly (see PrmSessionCreate docs). // Returns an error if parameters are set incorrectly (see PrmSessionCreate docs).
// Context is required and must not be nil. It is used for network communication. // Context is required and must not be nil. It is used for network communication.

View file

@ -156,11 +156,7 @@ func (a *meanIQRAgg) Compute() float64 {
} }
func (r *reverseMinNorm) Normalize(w float64) float64 { func (r *reverseMinNorm) Normalize(w float64) float64 {
if w == 0 { return (r.min + 1) / (w + 1)
return 0
}
return r.min / w
} }
func (r *sigmoidNorm) Normalize(w float64) float64 { func (r *sigmoidNorm) Normalize(w float64) float64 {

View file

@ -146,19 +146,19 @@
"select 3 nodes in 3 distinct countries, same placement": { "select 3 nodes in 3 distinct countries, same placement": {
"policy": {"replicas":[{"count":1,"selector":"Main"}],"containerBackupFactor":1,"selectors":[{"name":"Main","count":3,"clause":"DISTINCT","attribute":"Country","filter":"*"}],"filters":[]}, "policy": {"replicas":[{"count":1,"selector":"Main"}],"containerBackupFactor":1,"selectors":[{"name":"Main","count":3,"clause":"DISTINCT","attribute":"Country","filter":"*"}],"filters":[]},
"pivot": "Y29udGFpbmVySUQ=", "pivot": "Y29udGFpbmVySUQ=",
"result": [[0, 2, 3]], "result": [ [ 5, 0, 7 ] ],
"placement": { "placement": {
"pivot": "b2JqZWN0SUQ=", "pivot": "b2JqZWN0SUQ=",
"result": [[0, 2, 3]] "result": [ [ 5, 0, 7 ] ]
} }
}, },
"select 6 nodes in 3 distinct countries, different placement": { "select 6 nodes in 3 distinct countries, different placement": {
"policy": {"replicas":[{"count":1,"selector":"Main"}],"containerBackupFactor":2,"selectors":[{"name":"Main","count":3,"clause":"DISTINCT","attribute":"Country","filter":"*"}],"filters":[]}, "policy": {"replicas":[{"count":1,"selector":"Main"}],"containerBackupFactor":2,"selectors":[{"name":"Main","count":3,"clause":"DISTINCT","attribute":"Country","filter":"*"}],"filters":[]},
"pivot": "Y29udGFpbmVySUQ=", "pivot": "Y29udGFpbmVySUQ=",
"result": [[0, 1, 2, 6, 3, 4]], "result": [ [ 5, 4, 0, 1, 7, 2 ] ],
"placement": { "placement": {
"pivot": "b2JqZWN0SUQ=", "pivot": "b2JqZWN0SUQ=",
"result": [[0, 1, 2, 6, 3, 4]] "result": [ [ 5, 4, 0, 7, 2, 1 ] ]
} }
} }
} }

View file

@ -455,7 +455,7 @@ func (p PlacementPolicy) WriteStringTo(w io.StringWriter) (err error) {
return err return err
} }
err = writeFilterStringTo(w, p.filters[i]) err = writeFilterStringTo(w, p.filters[i], false)
if err != nil { if err != nil {
return err return err
} }
@ -464,7 +464,7 @@ func (p PlacementPolicy) WriteStringTo(w io.StringWriter) (err error) {
return nil return nil
} }
func writeFilterStringTo(w io.StringWriter, f netmap.Filter) error { func writeFilterStringTo(w io.StringWriter, f netmap.Filter, mayNeedOuterBrackets bool) error {
var err error var err error
var s string var s string
op := f.GetOp() op := f.GetOp()
@ -489,7 +489,7 @@ func writeFilterStringTo(w io.StringWriter, f netmap.Filter) error {
if err != nil { if err != nil {
return err return err
} }
err = writeFilterStringTo(w, inner[0]) err = writeFilterStringTo(w, inner[0], false)
if err != nil { if err != nil {
return err return err
} }
@ -498,6 +498,13 @@ func writeFilterStringTo(w io.StringWriter, f netmap.Filter) error {
return err return err
} }
} else { } else {
useBrackets := mayNeedOuterBrackets && op == netmap.OR && len(inner) > 1
if useBrackets {
_, err = w.WriteString("(")
if err != nil {
return err
}
}
for i := range inner { for i := range inner {
if i != 0 { if i != 0 {
_, err = w.WriteString(" " + op.String() + " ") _, err = w.WriteString(" " + op.String() + " ")
@ -505,7 +512,13 @@ func writeFilterStringTo(w io.StringWriter, f netmap.Filter) error {
return err return err
} }
} }
err = writeFilterStringTo(w, inner[i]) err = writeFilterStringTo(w, inner[i], true)
if err != nil {
return err
}
}
if useBrackets {
_, err = w.WriteString(")")
if err != nil { if err != nil {
return err return err
} }

View file

@ -1,6 +1,7 @@
package netmap_test package netmap_test
import ( import (
"strings"
"testing" "testing"
. "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap" . "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
@ -29,6 +30,79 @@ func TestPlacementPolicyEncoding(t *testing.T) {
}) })
} }
func TestPlacementPolicyWriteString(t *testing.T) {
var testCases = []struct {
name string
input string
output string // If the output is empty, make it equal to input.
}{
{
name: "no compound operators",
input: `REP 1
CBF 1
SELECT 1 FROM Color
FILTER Color EQ Red AS Color`,
},
{
name: "no brackets in single level same-operator chain",
input: `REP 1
CBF 1
SELECT 1 FROM Color
FILTER Color EQ Red OR Color EQ Blue OR Color EQ Green AS Color`,
},
{
name: "no brackets aroung higher precedence op",
input: `REP 1
CBF 1
SELECT 1 FROM Color
FILTER Color EQ Red OR Color EQ Blue AND Color NE Green AS Color`,
},
{
name: "no brackets aroung higher precedence op, even if present in the input",
input: `REP 1
CBF 1
SELECT 1 FROM Color
FILTER Color EQ Red OR (Color EQ Blue AND Color NE Green) AS Color`,
output: `REP 1
CBF 1
SELECT 1 FROM Color
FILTER Color EQ Red OR Color EQ Blue AND Color NE Green AS Color`,
},
{
name: "brackets aroung lower precedence op",
input: `REP 1
CBF 1
SELECT 1 FROM Color
FILTER (Color EQ Red OR Color EQ Blue) AND Color NE Green AS Color`,
},
{
name: "no extra brackets for bracketed same-operator chain",
input: `REP 1
CBF 1
SELECT 1 FROM Color
FILTER (Color EQ Red OR Color EQ Blue OR Color EQ Yellow) AND Color NE Green AS Color`,
},
}
for _, tc := range testCases {
var p PlacementPolicy
require.NoError(t, p.DecodeString(tc.input))
var sb strings.Builder
require.NoError(t, p.WriteStringTo(&sb))
if tc.output == "" {
require.Equal(t, tc.input, sb.String())
} else {
require.Equal(t, tc.output, sb.String())
var p1 PlacementPolicy
require.NoError(t, p1.DecodeString(tc.output))
require.Equal(t, p, p1)
}
}
}
func TestDecodeSelectFilterExpr(t *testing.T) { func TestDecodeSelectFilterExpr(t *testing.T) {
for _, s := range []string{ for _, s := range []string{
"SELECT 1 FROM *", "SELECT 1 FROM *",

View file

@ -18,8 +18,8 @@ func TestChannelTarget(t *testing.T) {
tt := new(testTarget) tt := new(testTarget)
ct := NewChannelTarget(ch) ct := NewChannelTarget(ch)
chTarget, _ := newPayloadSizeLimiter(maxSize, func() ObjectWriter { return ct }) chTarget, _ := newPayloadSizeLimiter(maxSize, 0, func() ObjectWriter { return ct })
testTarget, _ := newPayloadSizeLimiter(maxSize, func() ObjectWriter { return tt }) testTarget, _ := newPayloadSizeLimiter(maxSize, 0, func() ObjectWriter { return tt })
ver := version.Current() ver := version.Current()
cnr := cidtest.ID() cnr := cidtest.ID()

View file

@ -0,0 +1,72 @@
package transformer
import (
"context"
"crypto/rand"
"math"
"testing"
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
"github.com/stretchr/testify/require"
)
func TestTransformerSizeHintCorrectness(t *testing.T) {
const (
maxSize = 100
payloadSize = maxSize*2 + maxSize/2
)
pk, err := keys.NewPrivateKey()
require.NoError(t, err)
p := Params{
Key: &pk.PrivateKey,
NetworkState: dummyEpochSource(123),
MaxSize: maxSize,
WithoutHomomorphicHash: true,
}
cnr := cidtest.ID()
hdr := newObject(cnr)
var owner user.ID
user.IDFromKey(&owner, pk.PrivateKey.PublicKey)
hdr.SetOwnerID(&owner)
expected := make([]byte, payloadSize)
_, _ = rand.Read(expected)
t.Run("default", func(t *testing.T) {
p.SizeHint = 0
testPayloadEqual(t, p, hdr, expected)
})
t.Run("size hint is perfect", func(t *testing.T) {
p.SizeHint = payloadSize
testPayloadEqual(t, p, hdr, expected)
})
t.Run("size hint < payload size", func(t *testing.T) {
p.SizeHint = payloadSize / 2
testPayloadEqual(t, p, hdr, expected)
})
t.Run("size hint > payload size", func(t *testing.T) {
p.SizeHint = math.MaxUint64
testPayloadEqual(t, p, hdr, expected)
})
}
func testPayloadEqual(t *testing.T, p Params, hdr *objectSDK.Object, expected []byte) {
tt := new(testTarget)
p.NextTargetInit = func() ObjectWriter { return tt }
target := NewPayloadSizeLimiter(p)
writeObject(t, context.Background(), target, hdr, expected)
var actual []byte
for i := range tt.objects {
actual = append(actual, tt.objects[i].Payload()...)
}
require.Equal(t, expected, actual)
}

View file

@ -40,6 +40,11 @@ type Params struct {
NetworkState EpochSource NetworkState EpochSource
MaxSize uint64 MaxSize uint64
WithoutHomomorphicHash bool WithoutHomomorphicHash bool
// SizeHint is a hint for the total payload size to be processed.
// It is used primarily to optimize allocations and doesn't affect
// functionality. Primary usecases are providing file size when putting an object
// with the frostfs-cli or using Content-Length header in gateways.
SizeHint uint64
} }
// NewPayloadSizeLimiter returns ObjectTarget instance that restricts payload length // NewPayloadSizeLimiter returns ObjectTarget instance that restricts payload length
@ -121,7 +126,18 @@ func (s *payloadSizeLimiter) initializeCurrent() {
s.nextTarget = s.NextTargetInit() s.nextTarget = s.NextTargetInit()
s.writtenCurrent = 0 s.writtenCurrent = 0
s.initPayloadHashers() s.initPayloadHashers()
s.payload = make([]byte, 0)
var payloadSize uint64
// Check whether SizeHint is valid.
if remaining := s.SizeHint - s.written; remaining <= s.SizeHint {
if remaining >= s.MaxSize {
payloadSize = s.MaxSize
} else {
payloadSize = remaining % s.MaxSize
}
}
s.payload = make([]byte, 0, payloadSize)
} }
func (s *payloadSizeLimiter) initPayloadHashers() { func (s *payloadSizeLimiter) initPayloadHashers() {

View file

@ -20,7 +20,7 @@ func TestTransformer(t *testing.T) {
tt := new(testTarget) tt := new(testTarget)
target, pk := newPayloadSizeLimiter(maxSize, func() ObjectWriter { return tt }) target, pk := newPayloadSizeLimiter(maxSize, 0, func() ObjectWriter { return tt })
cnr := cidtest.ID() cnr := cidtest.ID()
hdr := newObject(cnr) hdr := newObject(cnr)
@ -114,15 +114,37 @@ func writeObject(t *testing.T, ctx context.Context, target ChunkedObjectWriter,
func BenchmarkTransformer(b *testing.B) { func BenchmarkTransformer(b *testing.B) {
hdr := newObject(cidtest.ID()) hdr := newObject(cidtest.ID())
const (
// bufferSize is taken from https://git.frostfs.info/TrueCloudLab/frostfs-sdk-go/src/commit/670619d2426fee233a37efe21a0471989b16a4fc/pool/pool.go#L1825
bufferSize = 3 * 1024 * 1024
smallSize = 8 * 1024
bigSize = 64 * 1024 * 1024 * 9 / 2 // 4.5 parts
)
b.Run("small", func(b *testing.B) { b.Run("small", func(b *testing.B) {
benchmarkTransformer(b, hdr, 8*1024) b.Run("no size hint", func(b *testing.B) {
benchmarkTransformer(b, hdr, smallSize, 0, 0)
})
b.Run("no size hint, with buffer", func(b *testing.B) {
benchmarkTransformer(b, hdr, smallSize, 0, bufferSize)
})
b.Run("with size hint, with buffer", func(b *testing.B) {
benchmarkTransformer(b, hdr, smallSize, smallSize, bufferSize)
})
}) })
b.Run("big", func(b *testing.B) { b.Run("big", func(b *testing.B) {
benchmarkTransformer(b, hdr, 64*1024*1024*9/2) b.Run("no size hint", func(b *testing.B) {
benchmarkTransformer(b, hdr, bigSize, 0, 0)
})
b.Run("no size hint, with buffer", func(b *testing.B) {
benchmarkTransformer(b, hdr, bigSize, 0, bufferSize)
})
b.Run("with size hint, with buffer", func(b *testing.B) {
benchmarkTransformer(b, hdr, bigSize, bigSize, bufferSize)
})
}) })
} }
func benchmarkTransformer(b *testing.B, header *objectSDK.Object, payloadSize int) { func benchmarkTransformer(b *testing.B, header *objectSDK.Object, payloadSize, sizeHint, bufferSize int) {
const maxSize = 64 * 1024 * 1024 const maxSize = 64 * 1024 * 1024
payload := make([]byte, payloadSize) payload := make([]byte, payloadSize)
@ -131,12 +153,24 @@ func benchmarkTransformer(b *testing.B, header *objectSDK.Object, payloadSize in
b.ReportAllocs() b.ReportAllocs()
b.ResetTimer() b.ResetTimer()
for i := 0; i < b.N; i++ { for i := 0; i < b.N; i++ {
f, _ := newPayloadSizeLimiter(maxSize, func() ObjectWriter { return benchTarget{} }) f, _ := newPayloadSizeLimiter(maxSize, uint64(sizeHint), func() ObjectWriter { return benchTarget{} })
if err := f.WriteHeader(ctx, header); err != nil { if err := f.WriteHeader(ctx, header); err != nil {
b.Fatalf("write header: %v", err) b.Fatalf("write header: %v", err)
} }
if _, err := f.Write(ctx, payload); err != nil { if bufferSize == 0 {
b.Fatalf("write: %v", err) if _, err := f.Write(ctx, payload); err != nil {
b.Fatalf("write: %v", err)
}
} else {
j := 0
for ; j+bufferSize < payloadSize; j += bufferSize {
if _, err := f.Write(ctx, payload[j:j+bufferSize]); err != nil {
b.Fatalf("write: %v", err)
}
}
if _, err := f.Write(ctx, payload[j:payloadSize]); err != nil {
b.Fatalf("write: %v", err)
}
} }
if _, err := f.Close(ctx); err != nil { if _, err := f.Close(ctx); err != nil {
b.Fatalf("close: %v", err) b.Fatalf("close: %v", err)
@ -144,7 +178,7 @@ func benchmarkTransformer(b *testing.B, header *objectSDK.Object, payloadSize in
} }
} }
func newPayloadSizeLimiter(maxSize uint64, nextTarget TargetInitializer) (ChunkedObjectWriter, *keys.PrivateKey) { func newPayloadSizeLimiter(maxSize uint64, sizeHint uint64, nextTarget TargetInitializer) (ChunkedObjectWriter, *keys.PrivateKey) {
p, err := keys.NewPrivateKey() p, err := keys.NewPrivateKey()
if err != nil { if err != nil {
panic(err) panic(err)
@ -155,6 +189,7 @@ func newPayloadSizeLimiter(maxSize uint64, nextTarget TargetInitializer) (Chunke
NextTargetInit: nextTarget, NextTargetInit: nextTarget,
NetworkState: dummyEpochSource(123), NetworkState: dummyEpochSource(123),
MaxSize: maxSize, MaxSize: maxSize,
SizeHint: sizeHint,
WithoutHomomorphicHash: true, WithoutHomomorphicHash: true,
}), p }), p
} }

View file

@ -110,16 +110,11 @@ func (it *internalTarget) WriteObject(ctx context.Context, o *object.Object) err
} }
func (it *internalTarget) putAsStream(ctx context.Context, o *object.Object) error { func (it *internalTarget) putAsStream(ctx context.Context, o *object.Object) error {
var cliPrm sdkClient.PrmObjectPutInit cliPrm := sdkClient.PrmObjectPutInit{
cliPrm.SetCopiesNumberByVectors(it.prm.copiesNumber) CopiesNumber: it.prm.copiesNumber,
if it.prm.stoken != nil { Session: it.prm.stoken,
cliPrm.WithinSession(*it.prm.stoken) Key: it.prm.key,
} BearerToken: it.prm.btoken,
if it.prm.key != nil {
cliPrm.UseKey(*it.prm.key)
}
if it.prm.btoken != nil {
cliPrm.WithBearerToken(*it.prm.btoken)
} }
wrt, err := it.client.ObjectPutInit(ctx, cliPrm) wrt, err := it.client.ObjectPutInit(ctx, cliPrm)
@ -141,16 +136,13 @@ func (it *internalTarget) tryPutSingle(ctx context.Context, o *object.Object) (b
if it.useStream { if it.useStream {
return false, nil return false, nil
} }
var cliPrm sdkClient.PrmObjectPutSingle cliPrm := sdkClient.PrmObjectPutSingle{
cliPrm.SetCopiesNumber(it.prm.copiesNumber) CopiesNumber: it.prm.copiesNumber,
cliPrm.UseKey(it.prm.key) Key: it.prm.key,
if it.prm.stoken != nil { Session: it.prm.stoken,
cliPrm.WithinSession(*it.prm.stoken) BearerToken: it.prm.btoken,
Object: o,
} }
if it.prm.btoken != nil {
cliPrm.WithBearerToken(*it.prm.btoken)
}
cliPrm.SetObject(o.ToV2())
res, err := it.client.ObjectPutSingle(ctx, cliPrm) res, err := it.client.ObjectPutSingle(ctx, cliPrm)
if err != nil && status.Code(err) == codes.Unimplemented { if err != nil && status.Code(err) == codes.Unimplemented {

View file

@ -9,7 +9,6 @@ import (
"io" "io"
"math" "math"
"math/rand" "math/rand"
"runtime/trace"
"sort" "sort"
"sync" "sync"
"sync/atomic" "sync/atomic"
@ -418,8 +417,9 @@ func (c *clientWrapper) balanceGet(ctx context.Context, prm PrmBalanceGet) (acco
return accounting.Decimal{}, err return accounting.Decimal{}, err
} }
var cliPrm sdkClient.PrmBalanceGet cliPrm := sdkClient.PrmBalanceGet{
cliPrm.SetAccount(prm.account) Account: &prm.account,
}
start := time.Now() start := time.Now()
res, err := cl.BalanceGet(ctx, cliPrm) res, err := cl.BalanceGet(ctx, cliPrm)
@ -503,8 +503,9 @@ func (c *clientWrapper) containerList(ctx context.Context, prm PrmContainerList)
return nil, err return nil, err
} }
var cliPrm sdkClient.PrmContainerList cliPrm := sdkClient.PrmContainerList{
cliPrm.SetAccount(prm.ownerID) Account: &prm.ownerID,
}
start := time.Now() start := time.Now()
res, err := cl.ContainerList(ctx, cliPrm) res, err := cl.ContainerList(ctx, cliPrm)
@ -687,9 +688,6 @@ func (c *clientWrapper) netMapSnapshot(ctx context.Context, _ prmNetMapSnapshot)
// objectPut writes object to FrostFS. // objectPut writes object to FrostFS.
func (c *clientWrapper) objectPut(ctx context.Context, prm PrmObjectPut) (oid.ID, error) { func (c *clientWrapper) objectPut(ctx context.Context, prm PrmObjectPut) (oid.ID, error) {
ctx, task := trace.NewTask(ctx, "obj.put")
defer task.End()
if prm.bufferMaxSize == 0 { if prm.bufferMaxSize == 0 {
prm.bufferMaxSize = defaultBufferMaxSizeForPut prm.bufferMaxSize = defaultBufferMaxSizeForPut
} }
@ -876,9 +874,6 @@ func (c *clientWrapper) objectDelete(ctx context.Context, prm PrmObjectDelete) e
// objectGet returns reader for object. // objectGet returns reader for object.
func (c *clientWrapper) objectGet(ctx context.Context, prm PrmObjectGet) (ResGetObject, error) { func (c *clientWrapper) objectGet(ctx context.Context, prm PrmObjectGet) (ResGetObject, error) {
ctx, task := trace.NewTask(ctx, "obj.getinit")
defer task.End()
cl, err := c.getClient() cl, err := c.getClient()
if err != nil { if err != nil {
return ResGetObject{}, err return ResGetObject{}, err
@ -1005,21 +1000,12 @@ func (c *clientWrapper) objectSearch(ctx context.Context, prm PrmObjectSearch) (
return ResObjectSearch{}, err return ResObjectSearch{}, err
} }
var cliPrm sdkClient.PrmObjectSearch cliPrm := sdkClient.PrmObjectSearch{
ContainerID: &prm.cnrID,
cliPrm.InContainer(prm.cnrID) Filters: prm.filters,
cliPrm.SetFilters(prm.filters) Session: prm.stoken,
BearerToken: prm.btoken,
if prm.stoken != nil { Key: prm.key,
cliPrm.WithinSession(*prm.stoken)
}
if prm.btoken != nil {
cliPrm.WithBearerToken(*prm.btoken)
}
if prm.key != nil {
cliPrm.UseKey(*prm.key)
} }
res, err := cl.ObjectSearchInit(ctx, cliPrm) res, err := cl.ObjectSearchInit(ctx, cliPrm)

View file

@ -5,7 +5,6 @@ import (
"crypto/tls" "crypto/tls"
"errors" "errors"
"fmt" "fmt"
"runtime/trace"
"sync" "sync"
apiClient "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/rpc/client" apiClient "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/rpc/client"
@ -64,22 +63,16 @@ func (c *treeClient) redialIfNecessary(ctx context.Context) (healthHasChanged bo
defer c.mu.Unlock() defer c.mu.Unlock()
if c.conn == nil { if c.conn == nil {
reg := trace.StartRegion(ctx, "tree.updatenodehealth.dial")
if c.conn, c.service, err = dialClient(ctx, c.address, c.opts...); err != nil { if c.conn, c.service, err = dialClient(ctx, c.address, c.opts...); err != nil {
reg.End()
return false, err return false, err
} }
reg.End()
} }
wasHealthy := c.healthy wasHealthy := c.healthy
reg := trace.StartRegion(ctx, "tree.updatenodehealth.healthcheck")
if _, err = c.service.Healthcheck(ctx, &grpcService.HealthcheckRequest{}); err != nil { if _, err = c.service.Healthcheck(ctx, &grpcService.HealthcheckRequest{}); err != nil {
c.healthy = false c.healthy = false
reg.End()
return wasHealthy, fmt.Errorf("healthcheck tree service: %w", err) return wasHealthy, fmt.Errorf("healthcheck tree service: %w", err)
} }
reg.End()
c.healthy = true c.healthy = true

View file

@ -5,7 +5,6 @@ import (
"errors" "errors"
"fmt" "fmt"
"io" "io"
"runtime/trace"
"sort" "sort"
"strings" "strings"
"sync" "sync"
@ -61,7 +60,6 @@ type InitParameters struct {
clientRebalanceInterval time.Duration clientRebalanceInterval time.Duration
nodeParams []pool.NodeParam nodeParams []pool.NodeParam
dialOptions []grpc.DialOption dialOptions []grpc.DialOption
maxRequestAttempts int
} }
// Pool represents virtual connection to the FrostFS tree services network to communicate // Pool represents virtual connection to the FrostFS tree services network to communicate
@ -80,8 +78,6 @@ type Pool struct {
dialOptions []grpc.DialOption dialOptions []grpc.DialOption
logger *zap.Logger logger *zap.Logger
maxRequestAttempts int
startIndicesMtx sync.RWMutex startIndicesMtx sync.RWMutex
// startIndices points to the client from which the next request will be executed. // startIndices points to the client from which the next request will be executed.
// Since clients are stored in innerPool field we have to use two indices. // Since clients are stored in innerPool field we have to use two indices.
@ -181,7 +177,6 @@ func NewPool(options InitParameters) (*Pool, error) {
nodeRequestTimeout: options.healthcheckTimeout, nodeRequestTimeout: options.healthcheckTimeout,
clientRebalanceInterval: options.clientRebalanceInterval, clientRebalanceInterval: options.clientRebalanceInterval,
}, },
maxRequestAttempts: options.maxRequestAttempts,
} }
return p, nil return p, nil
@ -273,21 +268,12 @@ func (x *InitParameters) SetGRPCDialOptions(opts ...grpc.DialOption) {
x.dialOptions = opts x.dialOptions = opts
} }
// SetMaxRequestAttempts sets the max attempt to make successful request.
// Default value is 0 that means the number of attempts equals to number of nodes in pool.
func (x *InitParameters) SetMaxRequestAttempts(maxAttempts int) {
x.maxRequestAttempts = maxAttempts
}
// GetNodes invokes eponymous method from TreeServiceClient. // GetNodes invokes eponymous method from TreeServiceClient.
// //
// Can return predefined errors: // Can return predefined errors:
// * ErrNodeNotFound // * ErrNodeNotFound
// * ErrNodeAccessDenied. // * ErrNodeAccessDenied.
func (p *Pool) GetNodes(ctx context.Context, prm GetNodesParams) ([]*grpcService.GetNodeByPathResponse_Info, error) { func (p *Pool) GetNodes(ctx context.Context, prm GetNodesParams) ([]*grpcService.GetNodeByPathResponse_Info, error) {
ctx, task := trace.NewTask(ctx, "tree.getnodes")
defer task.End()
request := &grpcService.GetNodeByPathRequest{ request := &grpcService.GetNodeByPathRequest{
Body: &grpcService.GetNodeByPathRequest_Body{ Body: &grpcService.GetNodeByPathRequest_Body{
ContainerId: prm.CID[:], ContainerId: prm.CID[:],
@ -311,10 +297,7 @@ func (p *Pool) GetNodes(ctx context.Context, prm GetNodesParams) ([]*grpcService
} }
var resp *grpcService.GetNodeByPathResponse var resp *grpcService.GetNodeByPathResponse
if err := p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) (inErr error) { if err := p.requestWithRetry(func(client grpcService.TreeServiceClient) (inErr error) {
reg := trace.StartRegion(ctx, "tree.getnodes.single")
defer reg.End()
resp, inErr = client.GetNodeByPath(ctx, request) resp, inErr = client.GetNodeByPath(ctx, request)
// Pool wants to do retry 'GetNodeByPath' request if result is empty. // Pool wants to do retry 'GetNodeByPath' request if result is empty.
// Empty result is expected due to delayed tree service sync. // Empty result is expected due to delayed tree service sync.
@ -411,7 +394,7 @@ func (p *Pool) GetSubTree(ctx context.Context, prm GetSubTreeParams) (*SubTreeRe
} }
var cli grpcService.TreeService_GetSubTreeClient var cli grpcService.TreeService_GetSubTreeClient
if err := p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) (inErr error) { if err := p.requestWithRetry(func(client grpcService.TreeServiceClient) (inErr error) {
cli, inErr = client.GetSubTree(ctx, request) cli, inErr = client.GetSubTree(ctx, request)
return handleError("failed to get sub tree client", inErr) return handleError("failed to get sub tree client", inErr)
}); err != nil { }); err != nil {
@ -427,9 +410,6 @@ func (p *Pool) GetSubTree(ctx context.Context, prm GetSubTreeParams) (*SubTreeRe
// * ErrNodeNotFound // * ErrNodeNotFound
// * ErrNodeAccessDenied. // * ErrNodeAccessDenied.
func (p *Pool) AddNode(ctx context.Context, prm AddNodeParams) (uint64, error) { func (p *Pool) AddNode(ctx context.Context, prm AddNodeParams) (uint64, error) {
ctx, task := trace.NewTask(ctx, "tree.addnode")
defer task.End()
request := &grpcService.AddRequest{ request := &grpcService.AddRequest{
Body: &grpcService.AddRequest_Body{ Body: &grpcService.AddRequest_Body{
ContainerId: prm.CID[:], ContainerId: prm.CID[:],
@ -449,10 +429,7 @@ func (p *Pool) AddNode(ctx context.Context, prm AddNodeParams) (uint64, error) {
} }
var resp *grpcService.AddResponse var resp *grpcService.AddResponse
if err := p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) (inErr error) { if err := p.requestWithRetry(func(client grpcService.TreeServiceClient) (inErr error) {
reg := trace.StartRegion(ctx, "tree.addnode.single")
defer reg.End()
resp, inErr = client.Add(ctx, request) resp, inErr = client.Add(ctx, request)
return handleError("failed to add node", inErr) return handleError("failed to add node", inErr)
}); err != nil { }); err != nil {
@ -468,9 +445,6 @@ func (p *Pool) AddNode(ctx context.Context, prm AddNodeParams) (uint64, error) {
// * ErrNodeNotFound // * ErrNodeNotFound
// * ErrNodeAccessDenied. // * ErrNodeAccessDenied.
func (p *Pool) AddNodeByPath(ctx context.Context, prm AddNodeByPathParams) (uint64, error) { func (p *Pool) AddNodeByPath(ctx context.Context, prm AddNodeByPathParams) (uint64, error) {
ctx, task := trace.NewTask(ctx, "tree.addnodebypath")
defer task.End()
request := &grpcService.AddByPathRequest{ request := &grpcService.AddByPathRequest{
Body: &grpcService.AddByPathRequest_Body{ Body: &grpcService.AddByPathRequest_Body{
ContainerId: prm.CID[:], ContainerId: prm.CID[:],
@ -492,10 +466,7 @@ func (p *Pool) AddNodeByPath(ctx context.Context, prm AddNodeByPathParams) (uint
} }
var resp *grpcService.AddByPathResponse var resp *grpcService.AddByPathResponse
if err := p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) (inErr error) { if err := p.requestWithRetry(func(client grpcService.TreeServiceClient) (inErr error) {
reg := trace.StartRegion(ctx, "tree.addnodebypath.single")
defer reg.End()
resp, inErr = client.AddByPath(ctx, request) resp, inErr = client.AddByPath(ctx, request)
return handleError("failed to add node by path", inErr) return handleError("failed to add node by path", inErr)
}); err != nil { }); err != nil {
@ -519,9 +490,6 @@ func (p *Pool) AddNodeByPath(ctx context.Context, prm AddNodeByPathParams) (uint
// * ErrNodeNotFound // * ErrNodeNotFound
// * ErrNodeAccessDenied. // * ErrNodeAccessDenied.
func (p *Pool) MoveNode(ctx context.Context, prm MoveNodeParams) error { func (p *Pool) MoveNode(ctx context.Context, prm MoveNodeParams) error {
ctx, task := trace.NewTask(ctx, "tree.movenode")
defer task.End()
request := &grpcService.MoveRequest{ request := &grpcService.MoveRequest{
Body: &grpcService.MoveRequest_Body{ Body: &grpcService.MoveRequest_Body{
ContainerId: prm.CID[:], ContainerId: prm.CID[:],
@ -542,10 +510,7 @@ func (p *Pool) MoveNode(ctx context.Context, prm MoveNodeParams) error {
return err return err
} }
return p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) error { return p.requestWithRetry(func(client grpcService.TreeServiceClient) error {
reg := trace.StartRegion(ctx, "tree.movenode.single")
defer reg.End()
if _, err := client.Move(ctx, request); err != nil { if _, err := client.Move(ctx, request); err != nil {
return handleError("failed to move node", err) return handleError("failed to move node", err)
} }
@ -559,9 +524,6 @@ func (p *Pool) MoveNode(ctx context.Context, prm MoveNodeParams) error {
// * ErrNodeNotFound // * ErrNodeNotFound
// * ErrNodeAccessDenied. // * ErrNodeAccessDenied.
func (p *Pool) RemoveNode(ctx context.Context, prm RemoveNodeParams) error { func (p *Pool) RemoveNode(ctx context.Context, prm RemoveNodeParams) error {
ctx, task := trace.NewTask(ctx, "tree.removenode")
defer task.End()
request := &grpcService.RemoveRequest{ request := &grpcService.RemoveRequest{
Body: &grpcService.RemoveRequest_Body{ Body: &grpcService.RemoveRequest_Body{
ContainerId: prm.CID[:], ContainerId: prm.CID[:],
@ -579,10 +541,7 @@ func (p *Pool) RemoveNode(ctx context.Context, prm RemoveNodeParams) error {
return err return err
} }
return p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) error { return p.requestWithRetry(func(client grpcService.TreeServiceClient) error {
reg := trace.StartRegion(ctx, "tree.removenode.single")
defer reg.End()
if _, err := client.Remove(ctx, request); err != nil { if _, err := client.Remove(ctx, request); err != nil {
return handleError("failed to remove node", err) return handleError("failed to remove node", err)
} }
@ -669,10 +628,6 @@ func fillDefaultInitParams(params *InitParameters) {
if params.nodeStreamTimeout <= 0 { if params.nodeStreamTimeout <= 0 {
params.nodeStreamTimeout = defaultStreamTimeout params.nodeStreamTimeout = defaultStreamTimeout
} }
if params.maxRequestAttempts <= 0 {
params.maxRequestAttempts = len(params.nodeParams)
}
} }
func (p *Pool) log(level zapcore.Level, msg string, fields ...zap.Field) { func (p *Pool) log(level zapcore.Level, msg string, fields ...zap.Field) {
@ -704,9 +659,6 @@ func (p *Pool) startRebalance(ctx context.Context) {
} }
func (p *Pool) updateNodesHealth(ctx context.Context, buffers [][]bool) { func (p *Pool) updateNodesHealth(ctx context.Context, buffers [][]bool) {
ctx, task := trace.NewTask(ctx, "tree.updatenodehealth")
defer task.End()
wg := sync.WaitGroup{} wg := sync.WaitGroup{}
for i, inner := range p.innerPools { for i, inner := range p.innerPools {
wg.Add(1) wg.Add(1)
@ -776,31 +728,19 @@ func (p *Pool) setStartIndices(i, j int) {
p.startIndicesMtx.Unlock() p.startIndicesMtx.Unlock()
} }
func (p *Pool) requestWithRetry(ctx context.Context, fn func(client grpcService.TreeServiceClient) error) error { func (p *Pool) requestWithRetry(fn func(client grpcService.TreeServiceClient) error) error {
var ( var (
err, finErr error err, finErr error
cl grpcService.TreeServiceClient cl grpcService.TreeServiceClient
) )
reqID := GetRequestID(ctx)
startI, startJ := p.getStartIndices() startI, startJ := p.getStartIndices()
groupsLen := len(p.innerPools) groupsLen := len(p.innerPools)
attempts := p.maxRequestAttempts
for i := startI; i < startI+groupsLen; i++ { for i := startI; i < startI+groupsLen; i++ {
indexI := i % groupsLen indexI := i % groupsLen
clientsLen := len(p.innerPools[indexI].clients) clientsLen := len(p.innerPools[indexI].clients)
for j := startJ; j < startJ+clientsLen; j++ { for j := startJ; j < startJ+clientsLen; j++ {
indexJ := j % clientsLen indexJ := j % clientsLen
if attempts == 0 {
if startI != indexI || startJ != indexJ {
p.setStartIndices(indexI, indexJ)
}
break
}
attempts--
if cl, err = p.innerPools[indexI].clients[indexJ].serviceClient(); err == nil { if cl, err = p.innerPools[indexI].clients[indexJ].serviceClient(); err == nil {
err = fn(cl) err = fn(cl)
} }
@ -810,10 +750,8 @@ func (p *Pool) requestWithRetry(ctx context.Context, fn func(client grpcService.
} }
return err return err
} }
finErr = finalError(finErr, err) finErr = finalError(finErr, err)
p.log(zap.DebugLevel, "tree request error", zap.String("request_id", reqID), zap.Int("remaining attempts", attempts), p.log(zap.DebugLevel, "tree request error", zap.String("address", p.innerPools[indexI].clients[indexJ].endpoint()), zap.Error(err))
zap.String("address", p.innerPools[indexI].clients[indexJ].endpoint()), zap.Error(err))
} }
startJ = 0 startJ = 0
} }
@ -834,9 +772,6 @@ func prioErr(err error) int {
case errors.Is(err, ErrNodeNotFound) || case errors.Is(err, ErrNodeNotFound) ||
errors.Is(err, errNodeEmptyResult): errors.Is(err, errNodeEmptyResult):
return 200 return 200
case errors.Is(err, context.Canceled) ||
errors.Is(err, context.DeadlineExceeded):
return 250
case errors.Is(err, ErrUnhealthyEndpoint): case errors.Is(err, ErrUnhealthyEndpoint):
return 300 return 300
default: default:
@ -856,16 +791,3 @@ func finalError(current, candidate error) error {
return current return current
} }
type reqKeyType string
const reqIDKey = reqKeyType("request_id")
func SetRequestID(ctx context.Context, reqID string) context.Context {
return context.WithValue(ctx, reqIDKey, reqID)
}
func GetRequestID(ctx context.Context) string {
reqID, _ := ctx.Value(reqIDKey).(string)
return reqID
}

View file

@ -77,21 +77,14 @@ func TestHandleError(t *testing.T) {
} }
func TestRetry(t *testing.T) { func TestRetry(t *testing.T) {
ctx := context.Background()
nodes := [][]string{ nodes := [][]string{
{"node00", "node01", "node02", "node03"}, {"node00", "node01", "node02", "node03"},
{"node10", "node11", "node12", "node13"}, {"node10", "node11", "node12", "node13"},
} }
var lenNodes int
for i := range nodes {
lenNodes += len(nodes[i])
}
p := &Pool{ p := &Pool{
logger: zaptest.NewLogger(t), logger: zaptest.NewLogger(t),
innerPools: makeInnerPool(nodes), innerPools: makeInnerPool(nodes),
maxRequestAttempts: lenNodes,
} }
makeFn := func(client grpcService.TreeServiceClient) error { makeFn := func(client grpcService.TreeServiceClient) error {
@ -99,14 +92,14 @@ func TestRetry(t *testing.T) {
} }
t.Run("first ok", func(t *testing.T) { t.Run("first ok", func(t *testing.T) {
err := p.requestWithRetry(ctx, makeFn) err := p.requestWithRetry(makeFn)
require.NoError(t, err) require.NoError(t, err)
checkIndicesAndReset(t, p, 0, 0) checkIndicesAndReset(t, p, 0, 0)
}) })
t.Run("first failed", func(t *testing.T) { t.Run("first failed", func(t *testing.T) {
setErrors(p, "node00") setErrors(p, "node00")
err := p.requestWithRetry(ctx, makeFn) err := p.requestWithRetry(makeFn)
require.NoError(t, err) require.NoError(t, err)
checkIndicesAndReset(t, p, 0, 1) checkIndicesAndReset(t, p, 0, 1)
}) })
@ -114,7 +107,7 @@ func TestRetry(t *testing.T) {
t.Run("all failed", func(t *testing.T) { t.Run("all failed", func(t *testing.T) {
setErrors(p, nodes[0]...) setErrors(p, nodes[0]...)
setErrors(p, nodes[1]...) setErrors(p, nodes[1]...)
err := p.requestWithRetry(ctx, makeFn) err := p.requestWithRetry(makeFn)
require.Error(t, err) require.Error(t, err)
checkIndicesAndReset(t, p, 0, 0) checkIndicesAndReset(t, p, 0, 0)
}) })
@ -122,13 +115,13 @@ func TestRetry(t *testing.T) {
t.Run("round", func(t *testing.T) { t.Run("round", func(t *testing.T) {
setErrors(p, nodes[0][0], nodes[0][1]) setErrors(p, nodes[0][0], nodes[0][1])
setErrors(p, nodes[1]...) setErrors(p, nodes[1]...)
err := p.requestWithRetry(ctx, makeFn) err := p.requestWithRetry(makeFn)
require.NoError(t, err) require.NoError(t, err)
checkIndices(t, p, 0, 2) checkIndices(t, p, 0, 2)
resetClientsErrors(p) resetClientsErrors(p)
setErrors(p, nodes[0][2], nodes[0][3]) setErrors(p, nodes[0][2], nodes[0][3])
err = p.requestWithRetry(ctx, makeFn) err = p.requestWithRetry(makeFn)
require.NoError(t, err) require.NoError(t, err)
checkIndicesAndReset(t, p, 0, 0) checkIndicesAndReset(t, p, 0, 0)
}) })
@ -136,14 +129,14 @@ func TestRetry(t *testing.T) {
t.Run("group switch", func(t *testing.T) { t.Run("group switch", func(t *testing.T) {
setErrors(p, nodes[0]...) setErrors(p, nodes[0]...)
setErrors(p, nodes[1][0]) setErrors(p, nodes[1][0])
err := p.requestWithRetry(ctx, makeFn) err := p.requestWithRetry(makeFn)
require.NoError(t, err) require.NoError(t, err)
checkIndicesAndReset(t, p, 1, 1) checkIndicesAndReset(t, p, 1, 1)
}) })
t.Run("group round", func(t *testing.T) { t.Run("group round", func(t *testing.T) {
setErrors(p, nodes[0][1:]...) setErrors(p, nodes[0][1:]...)
err := p.requestWithRetry(ctx, makeFn) err := p.requestWithRetry(makeFn)
require.NoError(t, err) require.NoError(t, err)
checkIndicesAndReset(t, p, 0, 0) checkIndicesAndReset(t, p, 0, 0)
}) })
@ -151,7 +144,7 @@ func TestRetry(t *testing.T) {
t.Run("group round switch", func(t *testing.T) { t.Run("group round switch", func(t *testing.T) {
setErrors(p, nodes[0]...) setErrors(p, nodes[0]...)
p.setStartIndices(0, 1) p.setStartIndices(0, 1)
err := p.requestWithRetry(ctx, makeFn) err := p.requestWithRetry(makeFn)
require.NoError(t, err) require.NoError(t, err)
checkIndicesAndReset(t, p, 1, 0) checkIndicesAndReset(t, p, 1, 0)
}) })
@ -159,14 +152,14 @@ func TestRetry(t *testing.T) {
t.Run("no panic group switch", func(t *testing.T) { t.Run("no panic group switch", func(t *testing.T) {
setErrors(p, nodes[1]...) setErrors(p, nodes[1]...)
p.setStartIndices(1, 0) p.setStartIndices(1, 0)
err := p.requestWithRetry(ctx, makeFn) err := p.requestWithRetry(makeFn)
require.NoError(t, err) require.NoError(t, err)
checkIndicesAndReset(t, p, 0, 0) checkIndicesAndReset(t, p, 0, 0)
}) })
t.Run("error empty result", func(t *testing.T) { t.Run("error empty result", func(t *testing.T) {
errNodes, index := 2, 0 errNodes, index := 2, 0
err := p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) error { err := p.requestWithRetry(func(client grpcService.TreeServiceClient) error {
if index < errNodes { if index < errNodes {
index++ index++
return errNodeEmptyResult return errNodeEmptyResult
@ -179,7 +172,7 @@ func TestRetry(t *testing.T) {
t.Run("error not found", func(t *testing.T) { t.Run("error not found", func(t *testing.T) {
errNodes, index := 2, 0 errNodes, index := 2, 0
err := p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) error { err := p.requestWithRetry(func(client grpcService.TreeServiceClient) error {
if index < errNodes { if index < errNodes {
index++ index++
return ErrNodeNotFound return ErrNodeNotFound
@ -192,7 +185,7 @@ func TestRetry(t *testing.T) {
t.Run("error access denied", func(t *testing.T) { t.Run("error access denied", func(t *testing.T) {
var index int var index int
err := p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) error { err := p.requestWithRetry(func(client grpcService.TreeServiceClient) error {
index++ index++
return ErrNodeAccessDenied return ErrNodeAccessDenied
}) })
@ -200,18 +193,6 @@ func TestRetry(t *testing.T) {
require.Equal(t, 1, index) require.Equal(t, 1, index)
checkIndicesAndReset(t, p, 0, 0) checkIndicesAndReset(t, p, 0, 0)
}) })
t.Run("limit attempts", func(t *testing.T) {
oldVal := p.maxRequestAttempts
p.maxRequestAttempts = 2
setErrors(p, nodes[0]...)
setErrors(p, nodes[1]...)
err := p.requestWithRetry(ctx, makeFn)
require.Error(t, err)
checkIndicesAndReset(t, p, 0, 2)
p.maxRequestAttempts = oldVal
})
} }
func TestRebalance(t *testing.T) { func TestRebalance(t *testing.T) {