forked from TrueCloudLab/frostfs-sdk-go
Compare commits
13 commits
poc/limit_
...
master
Author | SHA1 | Date | |
---|---|---|---|
6fbe1595cb | |||
a9237aabd2 | |||
a487033505 | |||
51c3618850 | |||
665e5807bc | |||
a02c0bfac8 | |||
20d325e307 | |||
670619d242 | |||
0d79d10482 | |||
9727beb47d | |||
84315fab6a | |||
71335489ae | |||
4c1feaf2cb |
35 changed files with 463 additions and 325 deletions
|
@ -8,17 +8,24 @@ jobs:
|
||||||
steps:
|
steps:
|
||||||
- uses: actions/checkout@v3
|
- uses: actions/checkout@v3
|
||||||
|
|
||||||
- name: golangci-lint
|
- name: Set up Go
|
||||||
uses: https://github.com/golangci/golangci-lint-action@v2
|
uses: actions/setup-go@v3
|
||||||
with:
|
with:
|
||||||
version: latest
|
go-version: '1.21'
|
||||||
|
cache: true
|
||||||
|
|
||||||
|
- name: Install linters
|
||||||
|
run: make lint-install
|
||||||
|
|
||||||
|
- name: Run linters
|
||||||
|
run: make lint
|
||||||
|
|
||||||
tests:
|
tests:
|
||||||
name: Tests
|
name: Tests
|
||||||
runs-on: ubuntu-latest
|
runs-on: ubuntu-latest
|
||||||
strategy:
|
strategy:
|
||||||
matrix:
|
matrix:
|
||||||
go_versions: [ '1.19', '1.20' ]
|
go_versions: [ '1.20', '1.21' ]
|
||||||
fail-fast: false
|
fail-fast: false
|
||||||
steps:
|
steps:
|
||||||
- uses: actions/checkout@v3
|
- uses: actions/checkout@v3
|
||||||
|
|
4
.gitignore
vendored
4
.gitignore
vendored
|
@ -27,3 +27,7 @@ antlr-*.jar
|
||||||
|
|
||||||
# tempfiles
|
# tempfiles
|
||||||
.cache
|
.cache
|
||||||
|
|
||||||
|
# binary
|
||||||
|
bin/
|
||||||
|
release/
|
||||||
|
|
21
Makefile
21
Makefile
|
@ -1,6 +1,11 @@
|
||||||
#!/usr/bin/make -f
|
#!/usr/bin/make -f
|
||||||
|
|
||||||
ANTLR_VERSION="4.13.0"
|
ANTLR_VERSION="4.13.0"
|
||||||
|
TMP_DIR := .cache
|
||||||
|
LINT_VERSION ?= 1.55.0
|
||||||
|
TRUECLOUDLAB_LINT_VERSION ?= 0.0.2
|
||||||
|
OUTPUT_LINT_DIR ?= $(shell pwd)/bin
|
||||||
|
LINT_DIR = $(OUTPUT_LINT_DIR)/golangci-lint-$(LINT_VERSION)-v$(TRUECLOUDLAB_LINT_VERSION)
|
||||||
|
|
||||||
# Run tests
|
# Run tests
|
||||||
test:
|
test:
|
||||||
|
@ -15,9 +20,23 @@ dep:
|
||||||
@CGO_ENABLED=0 \
|
@CGO_ENABLED=0 \
|
||||||
go mod tidy -v && echo OK
|
go mod tidy -v && echo OK
|
||||||
|
|
||||||
|
# Install linters
|
||||||
|
lint-install:
|
||||||
|
@mkdir -p $(TMP_DIR)
|
||||||
|
@rm -rf $(TMP_DIR)/linters
|
||||||
|
@git -c advice.detachedHead=false clone --branch v$(TRUECLOUDLAB_LINT_VERSION) https://git.frostfs.info/TrueCloudLab/linters.git $(TMP_DIR)/linters
|
||||||
|
@@make -C $(TMP_DIR)/linters lib CGO_ENABLED=1 OUT_DIR=$(OUTPUT_LINT_DIR)
|
||||||
|
@rm -rf $(TMP_DIR)/linters
|
||||||
|
@rmdir $(TMP_DIR) 2>/dev/null || true
|
||||||
|
@CGO_ENABLED=1 GOBIN=$(LINT_DIR) go install github.com/golangci/golangci-lint/cmd/golangci-lint@v$(LINT_VERSION)
|
||||||
|
|
||||||
# Run linters
|
# Run linters
|
||||||
lint:
|
lint:
|
||||||
@golangci-lint --timeout=5m run
|
@if [ ! -d "$(LINT_DIR)" ]; then \
|
||||||
|
echo "Run make lint-install"; \
|
||||||
|
exit 1; \
|
||||||
|
fi
|
||||||
|
$(LINT_DIR)/golangci-lint run
|
||||||
|
|
||||||
# Run tests with race detection and produce coverage output
|
# Run tests with race detection and produce coverage output
|
||||||
cover:
|
cover:
|
||||||
|
|
|
@ -42,7 +42,6 @@ Contains client for working with FrostFS.
|
||||||
```go
|
```go
|
||||||
var prmInit client.PrmInit
|
var prmInit client.PrmInit
|
||||||
prmInit.SetDefaultPrivateKey(key) // private key for request signing
|
prmInit.SetDefaultPrivateKey(key) // private key for request signing
|
||||||
prmInit.ResolveFrostFSFailures() // enable erroneous status parsing
|
|
||||||
|
|
||||||
var c client.Client
|
var c client.Client
|
||||||
c.Init(prmInit)
|
c.Init(prmInit)
|
||||||
|
@ -77,8 +76,7 @@ if needed and perform any desired action. In the case above we may want to repor
|
||||||
these details to the user as well as retry an operation, possibly with different parameters.
|
these details to the user as well as retry an operation, possibly with different parameters.
|
||||||
Status wire-format is extendable and each node can report any set of details it wants.
|
Status wire-format is extendable and each node can report any set of details it wants.
|
||||||
The set of reserved status codes can be found in
|
The set of reserved status codes can be found in
|
||||||
[FrostFS API](https://git.frostfs.info/TrueCloudLab/frostfs-api/src/branch/master/status/types.proto). There is also
|
[FrostFS API](https://git.frostfs.info/TrueCloudLab/frostfs-api/src/branch/master/status/types.proto).
|
||||||
a `client.PrmInit.ResolveFrostFSFailures()` to seamlessly convert erroneous statuses into Go error type.
|
|
||||||
|
|
||||||
### policy
|
### policy
|
||||||
Contains helpers allowing conversion of placing policy from/to JSON representation
|
Contains helpers allowing conversion of placing policy from/to JSON representation
|
||||||
|
|
|
@ -17,26 +17,26 @@ import (
|
||||||
|
|
||||||
// PrmBalanceGet groups parameters of BalanceGet operation.
|
// PrmBalanceGet groups parameters of BalanceGet operation.
|
||||||
type PrmBalanceGet struct {
|
type PrmBalanceGet struct {
|
||||||
prmCommonMeta
|
XHeaders []string
|
||||||
|
|
||||||
accountSet bool
|
Account *user.ID
|
||||||
account user.ID
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// SetAccount sets identifier of the FrostFS account for which the balance is requested.
|
// SetAccount sets identifier of the FrostFS account for which the balance is requested.
|
||||||
// Required parameter.
|
// Required parameter.
|
||||||
|
//
|
||||||
|
// Deprecated: Use PrmBalanceGet.Account instead.
|
||||||
func (x *PrmBalanceGet) SetAccount(id user.ID) {
|
func (x *PrmBalanceGet) SetAccount(id user.ID) {
|
||||||
x.account = id
|
x.Account = &id
|
||||||
x.accountSet = true
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (x *PrmBalanceGet) buildRequest(c *Client) (*v2accounting.BalanceRequest, error) {
|
func (x *PrmBalanceGet) buildRequest(c *Client) (*v2accounting.BalanceRequest, error) {
|
||||||
if !x.accountSet {
|
if x.Account == nil {
|
||||||
return nil, errorAccountNotSet
|
return nil, errorAccountNotSet
|
||||||
}
|
}
|
||||||
|
|
||||||
var accountV2 refs.OwnerID
|
var accountV2 refs.OwnerID
|
||||||
x.account.WriteToV2(&accountV2)
|
x.Account.WriteToV2(&accountV2)
|
||||||
|
|
||||||
var body v2accounting.BalanceRequestBody
|
var body v2accounting.BalanceRequestBody
|
||||||
body.SetOwnerID(&accountV2)
|
body.SetOwnerID(&accountV2)
|
||||||
|
@ -64,9 +64,9 @@ func (x ResBalanceGet) Amount() accounting.Decimal {
|
||||||
//
|
//
|
||||||
// Exactly one return value is non-nil. By default, server status is returned in res structure.
|
// Exactly one return value is non-nil. By default, server status is returned in res structure.
|
||||||
// Any client's internal or transport errors are returned as `error`,
|
// Any client's internal or transport errors are returned as `error`,
|
||||||
// If PrmInit.ResolveFrostFSFailures has been called, unsuccessful
|
// If PrmInit.DisableFrostFSFailuresResolution has been called, unsuccessful
|
||||||
// FrostFS status codes are returned as `error`, otherwise, are included
|
// FrostFS status codes are included in the returned result structure,
|
||||||
// in the returned result structure.
|
// otherwise, are also returned as `error`.
|
||||||
//
|
//
|
||||||
// Returns an error if parameters are set incorrectly (see PrmBalanceGet docs).
|
// Returns an error if parameters are set incorrectly (see PrmBalanceGet docs).
|
||||||
// Context is required and must not be nil. It is used for network communication.
|
// Context is required and must not be nil. It is used for network communication.
|
||||||
|
|
|
@ -144,7 +144,7 @@ func (c *Client) Close() error {
|
||||||
//
|
//
|
||||||
// See also Init.
|
// See also Init.
|
||||||
type PrmInit struct {
|
type PrmInit struct {
|
||||||
resolveFrostFSErrors bool
|
disableFrostFSErrorResolution bool
|
||||||
|
|
||||||
key ecdsa.PrivateKey
|
key ecdsa.PrivateKey
|
||||||
|
|
||||||
|
@ -161,12 +161,16 @@ func (x *PrmInit) SetDefaultPrivateKey(key ecdsa.PrivateKey) {
|
||||||
x.key = key
|
x.key = key
|
||||||
}
|
}
|
||||||
|
|
||||||
// ResolveFrostFSFailures makes the Client to resolve failure statuses of the
|
// Deprecated: method is no-op. Option is default.
|
||||||
// FrostFS protocol into Go built-in errors. These errors are returned from
|
|
||||||
// each protocol operation. By default, statuses aren't resolved and written
|
|
||||||
// to the resulting structure (see corresponding Res* docs).
|
|
||||||
func (x *PrmInit) ResolveFrostFSFailures() {
|
func (x *PrmInit) ResolveFrostFSFailures() {
|
||||||
x.resolveFrostFSErrors = true
|
}
|
||||||
|
|
||||||
|
// DisableFrostFSFailuresResolution makes the Client to preserve failure statuses of the
|
||||||
|
// FrostFS protocol only in resulting structure (see corresponding Res* docs).
|
||||||
|
// These errors are returned from each protocol operation. By default, statuses
|
||||||
|
// are resolved and returned as a Go built-in errors.
|
||||||
|
func (x *PrmInit) DisableFrostFSFailuresResolution() {
|
||||||
|
x.disableFrostFSErrorResolution = true
|
||||||
}
|
}
|
||||||
|
|
||||||
// SetResponseInfoCallback makes the Client to pass ResponseMetaInfo from each
|
// SetResponseInfoCallback makes the Client to pass ResponseMetaInfo from each
|
||||||
|
|
|
@ -24,24 +24,6 @@ func (x statusRes) Status() apistatus.Status {
|
||||||
return x.st
|
return x.st
|
||||||
}
|
}
|
||||||
|
|
||||||
// groups meta parameters shared between all Client operations.
|
|
||||||
type prmCommonMeta struct {
|
|
||||||
// FrostFS request X-Headers
|
|
||||||
xHeaders []string
|
|
||||||
}
|
|
||||||
|
|
||||||
// WithXHeaders specifies list of extended headers (string key-value pairs)
|
|
||||||
// to be attached to the request. Must have an even length.
|
|
||||||
//
|
|
||||||
// Slice must not be mutated until the operation completes.
|
|
||||||
func (x *prmCommonMeta) WithXHeaders(hs ...string) {
|
|
||||||
if len(hs)%2 != 0 {
|
|
||||||
panic("slice of X-Headers with odd length")
|
|
||||||
}
|
|
||||||
|
|
||||||
x.xHeaders = hs
|
|
||||||
}
|
|
||||||
|
|
||||||
func writeXHeadersToMeta(xHeaders []string, h *v2session.RequestMetaHeader) {
|
func writeXHeadersToMeta(xHeaders []string, h *v2session.RequestMetaHeader) {
|
||||||
if len(xHeaders) == 0 {
|
if len(xHeaders) == 0 {
|
||||||
return
|
return
|
||||||
|
@ -119,7 +101,7 @@ func (c *Client) processResponse(resp responseV2) (apistatus.Status, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
st := apistatus.FromStatusV2(resp.GetMetaHeader().GetStatus())
|
st := apistatus.FromStatusV2(resp.GetMetaHeader().GetStatus())
|
||||||
if c.prm.resolveFrostFSErrors {
|
if !c.prm.disableFrostFSErrorResolution {
|
||||||
return st, apistatus.ErrFromStatus(st)
|
return st, apistatus.ErrFromStatus(st)
|
||||||
}
|
}
|
||||||
return st, nil
|
return st, nil
|
||||||
|
|
|
@ -101,9 +101,9 @@ type ResContainerDelete struct {
|
||||||
//
|
//
|
||||||
// Exactly one return value is non-nil. By default, server status is returned in res structure.
|
// Exactly one return value is non-nil. By default, server status is returned in res structure.
|
||||||
// Any client's internal or transport errors are returned as `error`.
|
// Any client's internal or transport errors are returned as `error`.
|
||||||
// If PrmInit.ResolveFrostFSFailures has been called, unsuccessful
|
// If PrmInit.DisableFrostFSFailuresResolution has been called, unsuccessful
|
||||||
// FrostFS status codes are returned as `error`, otherwise, are included
|
// FrostFS status codes are included in the returned result structure,
|
||||||
// in the returned result structure.
|
// otherwise, are also returned as `error`.
|
||||||
//
|
//
|
||||||
// Operation is asynchronous and no guaranteed even in the absence of errors.
|
// Operation is asynchronous and no guaranteed even in the absence of errors.
|
||||||
// The required time is also not predictable.
|
// The required time is also not predictable.
|
||||||
|
|
|
@ -68,9 +68,9 @@ func (x ResContainerEACL) Table() eacl.Table {
|
||||||
//
|
//
|
||||||
// Exactly one return value is non-nil. By default, server status is returned in res structure.
|
// Exactly one return value is non-nil. By default, server status is returned in res structure.
|
||||||
// Any client's internal or transport errors are returned as `error`.
|
// Any client's internal or transport errors are returned as `error`.
|
||||||
// If PrmInit.ResolveFrostFSFailures has been called, unsuccessful
|
// If PrmInit.DisableFrostFSFailuresResolution has been called, unsuccessful
|
||||||
// FrostFS status codes are returned as `error`, otherwise, are included
|
// FrostFS status codes are included in the returned result structure,
|
||||||
// in the returned result structure.
|
// otherwise, are also returned as `error`.
|
||||||
//
|
//
|
||||||
// Returns an error if parameters are set incorrectly (see PrmContainerEACL docs).
|
// Returns an error if parameters are set incorrectly (see PrmContainerEACL docs).
|
||||||
// Context is required and must not be nil. It is used for network communication.
|
// Context is required and must not be nil. It is used for network communication.
|
||||||
|
|
|
@ -71,9 +71,9 @@ func (x ResContainerGet) Container() container.Container {
|
||||||
//
|
//
|
||||||
// Exactly one return value is non-nil. By default, server status is returned in res structure.
|
// Exactly one return value is non-nil. By default, server status is returned in res structure.
|
||||||
// Any client's internal or transport errors are returned as `error`.
|
// Any client's internal or transport errors are returned as `error`.
|
||||||
// If PrmInit.ResolveFrostFSFailures has been called, unsuccessful
|
// If PrmInit.DisableFrostFSFailuresResolution has been called, unsuccessful
|
||||||
// FrostFS status codes are returned as `error`, otherwise, are included
|
// FrostFS status codes are included in the returned result structure,
|
||||||
// in the returned result structure.
|
// otherwise, are also returned as `error`.
|
||||||
//
|
//
|
||||||
// Returns an error if parameters are set incorrectly (see PrmContainerGet docs).
|
// Returns an error if parameters are set incorrectly (see PrmContainerGet docs).
|
||||||
// Context is required and must not be nil. It is used for network communication.
|
// Context is required and must not be nil. It is used for network communication.
|
||||||
|
|
|
@ -17,26 +17,26 @@ import (
|
||||||
|
|
||||||
// PrmContainerList groups parameters of ContainerList operation.
|
// PrmContainerList groups parameters of ContainerList operation.
|
||||||
type PrmContainerList struct {
|
type PrmContainerList struct {
|
||||||
prmCommonMeta
|
XHeaders []string
|
||||||
|
|
||||||
ownerSet bool
|
Account *user.ID
|
||||||
ownerID user.ID
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// SetAccount sets identifier of the FrostFS account to list the containers.
|
// SetAccount sets identifier of the FrostFS account to list the containers.
|
||||||
// Required parameter.
|
// Required parameter.
|
||||||
|
//
|
||||||
|
// Deprecated: Use PrmContainerList.Account instead.
|
||||||
func (x *PrmContainerList) SetAccount(id user.ID) {
|
func (x *PrmContainerList) SetAccount(id user.ID) {
|
||||||
x.ownerID = id
|
x.Account = &id
|
||||||
x.ownerSet = true
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (x *PrmContainerList) buildRequest(c *Client) (*v2container.ListRequest, error) {
|
func (x *PrmContainerList) buildRequest(c *Client) (*v2container.ListRequest, error) {
|
||||||
if !x.ownerSet {
|
if x.Account == nil {
|
||||||
return nil, errorAccountNotSet
|
return nil, errorAccountNotSet
|
||||||
}
|
}
|
||||||
|
|
||||||
var ownerV2 refs.OwnerID
|
var ownerV2 refs.OwnerID
|
||||||
x.ownerID.WriteToV2(&ownerV2)
|
x.Account.WriteToV2(&ownerV2)
|
||||||
|
|
||||||
reqBody := new(v2container.ListRequestBody)
|
reqBody := new(v2container.ListRequestBody)
|
||||||
reqBody.SetOwnerID(&ownerV2)
|
reqBody.SetOwnerID(&ownerV2)
|
||||||
|
@ -65,9 +65,9 @@ func (x ResContainerList) Containers() []cid.ID {
|
||||||
//
|
//
|
||||||
// Exactly one return value is non-nil. By default, server status is returned in res structure.
|
// Exactly one return value is non-nil. By default, server status is returned in res structure.
|
||||||
// Any client's internal or transport errors are returned as `error`.
|
// Any client's internal or transport errors are returned as `error`.
|
||||||
// If PrmInit.ResolveFrostFSFailures has been called, unsuccessful
|
// If PrmInit.DisableFrostFSFailuresResolution has been called, unsuccessful
|
||||||
// FrostFS status codes are returned as `error`, otherwise, are included
|
// FrostFS status codes are included in the returned result structure,
|
||||||
// in the returned result structure.
|
// otherwise, are also returned as `error`.
|
||||||
//
|
//
|
||||||
// Returns an error if parameters are set incorrectly (see PrmContainerList docs).
|
// Returns an error if parameters are set incorrectly (see PrmContainerList docs).
|
||||||
// Context is required and must not be nil. It is used for network communication.
|
// Context is required and must not be nil. It is used for network communication.
|
||||||
|
|
|
@ -110,9 +110,9 @@ func (x ResContainerPut) ID() cid.ID {
|
||||||
//
|
//
|
||||||
// Exactly one return value is non-nil. By default, server status is returned in res structure.
|
// Exactly one return value is non-nil. By default, server status is returned in res structure.
|
||||||
// Any client's internal or transport errors are returned as `error`.
|
// Any client's internal or transport errors are returned as `error`.
|
||||||
// If PrmInit.ResolveFrostFSFailures has been called, unsuccessful
|
// If PrmInit.DisableFrostFSFailuresResolution has been called, unsuccessful
|
||||||
// FrostFS status codes are returned as `error`, otherwise, are included
|
// FrostFS status codes are included in the returned result structure,
|
||||||
// in the returned result structure.
|
// otherwise, are also returned as `error`.
|
||||||
//
|
//
|
||||||
// Operation is asynchronous and no guaranteed even in the absence of errors.
|
// Operation is asynchronous and no guaranteed even in the absence of errors.
|
||||||
// The required time is also not predictable.
|
// The required time is also not predictable.
|
||||||
|
|
|
@ -101,9 +101,9 @@ type ResContainerSetEACL struct {
|
||||||
//
|
//
|
||||||
// Exactly one return value is non-nil. By default, server status is returned in res structure.
|
// Exactly one return value is non-nil. By default, server status is returned in res structure.
|
||||||
// Any client's internal or transport errors are returned as `error`.
|
// Any client's internal or transport errors are returned as `error`.
|
||||||
// If PrmInit.ResolveFrostFSFailures has been called, unsuccessful
|
// If PrmInit.DisableFrostFSFailuresResolution has been called, unsuccessful
|
||||||
// FrostFS status codes are returned as `error`, otherwise, are included
|
// FrostFS status codes are included in the returned result structure,
|
||||||
// in the returned result structure.
|
// otherwise, are also returned as `error`.
|
||||||
//
|
//
|
||||||
// Operation is asynchronous and no guaranteed even in the absence of errors.
|
// Operation is asynchronous and no guaranteed even in the absence of errors.
|
||||||
// The required time is also not predictable.
|
// The required time is also not predictable.
|
||||||
|
|
|
@ -57,9 +57,9 @@ type ResAnnounceSpace struct {
|
||||||
//
|
//
|
||||||
// Exactly one return value is non-nil. By default, server status is returned in res structure.
|
// Exactly one return value is non-nil. By default, server status is returned in res structure.
|
||||||
// Any client's internal or transport errors are returned as `error`.
|
// Any client's internal or transport errors are returned as `error`.
|
||||||
// If PrmInit.ResolveFrostFSFailures has been called, unsuccessful
|
// If PrmInit.DisableFrostFSFailuresResolution has been called, unsuccessful
|
||||||
// FrostFS status codes are returned as `error`, otherwise, are included
|
// FrostFS status codes are included in the returned result structure,
|
||||||
// in the returned result structure.
|
// otherwise, are also returned as `error`.
|
||||||
//
|
//
|
||||||
// Operation is asynchronous and no guaranteed even in the absence of errors.
|
// Operation is asynchronous and no guaranteed even in the absence of errors.
|
||||||
// The required time is also not predictable.
|
// The required time is also not predictable.
|
||||||
|
|
|
@ -53,9 +53,9 @@ func (x ResEndpointInfo) NodeInfo() netmap.NodeInfo {
|
||||||
// Method can be used as a health check to see if node is alive and responds to requests.
|
// Method can be used as a health check to see if node is alive and responds to requests.
|
||||||
//
|
//
|
||||||
// Any client's internal or transport errors are returned as `error`.
|
// Any client's internal or transport errors are returned as `error`.
|
||||||
// If PrmInit.ResolveFrostFSFailures has been called, unsuccessful
|
// If PrmInit.DisableFrostFSFailuresResolution has been called, unsuccessful
|
||||||
// FrostFS status codes are returned as `error`, otherwise, are included
|
// FrostFS status codes are included in the returned result structure,
|
||||||
// in the returned result structure.
|
// otherwise, are also returned as `error`.
|
||||||
//
|
//
|
||||||
// Returns an error if parameters are set incorrectly (see PrmEndpointInfo docs).
|
// Returns an error if parameters are set incorrectly (see PrmEndpointInfo docs).
|
||||||
// Context is required and must not be nil. It is used for network communication.
|
// Context is required and must not be nil. It is used for network communication.
|
||||||
|
@ -140,9 +140,9 @@ func (x ResNetworkInfo) Info() netmap.NetworkInfo {
|
||||||
// NetworkInfo requests information about the FrostFS network of which the remote server is a part.
|
// NetworkInfo requests information about the FrostFS network of which the remote server is a part.
|
||||||
//
|
//
|
||||||
// Any client's internal or transport errors are returned as `error`.
|
// Any client's internal or transport errors are returned as `error`.
|
||||||
// If PrmInit.ResolveFrostFSFailures has been called, unsuccessful
|
// If PrmInit.DisableFrostFSFailuresResolution has been called, unsuccessful
|
||||||
// FrostFS status codes are returned as `error`, otherwise, are included
|
// FrostFS status codes are included in the returned result structure,
|
||||||
// in the returned result structure.
|
// otherwise, are also returned as `error`.
|
||||||
//
|
//
|
||||||
// Returns an error if parameters are set incorrectly (see PrmNetworkInfo docs).
|
// Returns an error if parameters are set incorrectly (see PrmNetworkInfo docs).
|
||||||
// Context is required and must not be nil. It is used for network communication.
|
// Context is required and must not be nil. It is used for network communication.
|
||||||
|
@ -204,9 +204,9 @@ func (x ResNetMapSnapshot) NetMap() netmap.NetMap {
|
||||||
// NetMapSnapshot requests current network view of the remote server.
|
// NetMapSnapshot requests current network view of the remote server.
|
||||||
//
|
//
|
||||||
// Any client's internal or transport errors are returned as `error`.
|
// Any client's internal or transport errors are returned as `error`.
|
||||||
// If PrmInit.ResolveFrostFSFailures has been called, unsuccessful
|
// If PrmInit.DisableFrostFSFailuresResolution has been called, unsuccessful
|
||||||
// FrostFS status codes are returned as `error`, otherwise, are included
|
// FrostFS status codes are included in the returned result structure,
|
||||||
// in the returned result structure.
|
// otherwise, are also returned as `error`.
|
||||||
//
|
//
|
||||||
// Returns an error if parameters are set incorrectly.
|
// Returns an error if parameters are set incorrectly.
|
||||||
// Context is required and MUST NOT be nil. It is used for network communication.
|
// Context is required and MUST NOT be nil. It is used for network communication.
|
||||||
|
|
|
@ -67,6 +67,7 @@ func TestClient_NetMapSnapshot(t *testing.T) {
|
||||||
var res *ResNetMapSnapshot
|
var res *ResNetMapSnapshot
|
||||||
var srv serverNetMap
|
var srv serverNetMap
|
||||||
c := newClient(&srv)
|
c := newClient(&srv)
|
||||||
|
c.prm.DisableFrostFSFailuresResolution()
|
||||||
ctx := context.Background()
|
ctx := context.Background()
|
||||||
|
|
||||||
// request signature
|
// request signature
|
||||||
|
|
|
@ -112,9 +112,9 @@ func (prm *PrmObjectDelete) buildRequest(c *Client) (*v2object.DeleteRequest, er
|
||||||
//
|
//
|
||||||
// Exactly one return value is non-nil. By default, server status is returned in res structure.
|
// Exactly one return value is non-nil. By default, server status is returned in res structure.
|
||||||
// Any client's internal or transport errors are returned as `error`,
|
// Any client's internal or transport errors are returned as `error`,
|
||||||
// If PrmInit.ResolveFrostFSFailures has been called, unsuccessful
|
// If PrmInit.DisableFrostFSFailuresResolution has been called, unsuccessful
|
||||||
// FrostFS status codes are returned as `error`, otherwise, are included
|
// FrostFS status codes are included in the returned result structure,
|
||||||
// in the returned result structure.
|
// otherwise, are also returned as `error`.
|
||||||
//
|
//
|
||||||
// Returns an error if parameters are set incorrectly (see PrmObjectDelete docs).
|
// Returns an error if parameters are set incorrectly (see PrmObjectDelete docs).
|
||||||
// Context is required and must not be nil. It is used for network communication.
|
// Context is required and must not be nil. It is used for network communication.
|
||||||
|
|
|
@ -444,9 +444,9 @@ func (prm *PrmObjectHead) buildRequest(c *Client) (*v2object.HeadRequest, error)
|
||||||
//
|
//
|
||||||
// Exactly one return value is non-nil. By default, server status is returned in res structure.
|
// Exactly one return value is non-nil. By default, server status is returned in res structure.
|
||||||
// Any client's internal or transport errors are returned as `error`,
|
// Any client's internal or transport errors are returned as `error`,
|
||||||
// If PrmInit.ResolveFrostFSFailures has been called, unsuccessful
|
// If PrmInit.DisableFrostFSFailuresResolution has been called, unsuccessful
|
||||||
// FrostFS status codes are returned as `error`, otherwise, are included
|
// FrostFS status codes are included in the returned result structure,
|
||||||
// in the returned result structure.
|
// otherwise, are also returned as `error`.
|
||||||
//
|
//
|
||||||
// Returns an error if parameters are set incorrectly (see PrmObjectHead docs).
|
// Returns an error if parameters are set incorrectly (see PrmObjectHead docs).
|
||||||
// Context is required and must not be nil. It is used for network communication.
|
// Context is required and must not be nil. It is used for network communication.
|
||||||
|
|
|
@ -152,9 +152,9 @@ func (prm *PrmObjectHash) buildRequest(c *Client) (*v2object.GetRangeHashRequest
|
||||||
//
|
//
|
||||||
// Exactly one return value is non-nil. By default, server status is returned in res structure.
|
// Exactly one return value is non-nil. By default, server status is returned in res structure.
|
||||||
// Any client's internal or transport errors are returned as `error`,
|
// Any client's internal or transport errors are returned as `error`,
|
||||||
// If PrmInit.ResolveFrostFSFailures has been called, unsuccessful
|
// If PrmInit.DisableFrostFSFailuresResolution has been called, unsuccessful
|
||||||
// FrostFS status codes are returned as `error`, otherwise, are included
|
// FrostFS status codes are included in the returned result structure,
|
||||||
// in the returned result structure.
|
// otherwise, are also returned as `error`.
|
||||||
//
|
//
|
||||||
// Returns an error if parameters are set incorrectly (see PrmObjectHash docs).
|
// Returns an error if parameters are set incorrectly (see PrmObjectHash docs).
|
||||||
// Context is required and must not be nil. It is used for network communication.
|
// Context is required and must not be nil. It is used for network communication.
|
||||||
|
|
|
@ -83,7 +83,7 @@ func (it *internalTarget) putAsStream(ctx context.Context, o *object.Object) err
|
||||||
wrt.WritePayloadChunk(ctx, o.Payload())
|
wrt.WritePayloadChunk(ctx, o.Payload())
|
||||||
}
|
}
|
||||||
it.res, err = wrt.Close(ctx)
|
it.res, err = wrt.Close(ctx)
|
||||||
if err == nil && !it.client.prm.resolveFrostFSErrors && !apistatus.IsSuccessful(it.res.st) {
|
if err == nil && it.client.prm.disableFrostFSErrorResolution && !apistatus.IsSuccessful(it.res.st) {
|
||||||
err = apistatus.ErrFromStatus(it.res.st)
|
err = apistatus.ErrFromStatus(it.res.st)
|
||||||
}
|
}
|
||||||
return err
|
return err
|
||||||
|
@ -115,7 +115,7 @@ func (it *internalTarget) tryPutSingle(ctx context.Context, o *object.Object) (b
|
||||||
statusRes: res.statusRes,
|
statusRes: res.statusRes,
|
||||||
obj: id,
|
obj: id,
|
||||||
}
|
}
|
||||||
if !it.client.prm.resolveFrostFSErrors && !apistatus.IsSuccessful(it.res.st) {
|
if it.client.prm.disableFrostFSErrorResolution && !apistatus.IsSuccessful(it.res.st) {
|
||||||
return true, apistatus.ErrFromStatus(it.res.st)
|
return true, apistatus.ErrFromStatus(it.res.st)
|
||||||
}
|
}
|
||||||
return true, nil
|
return true, nil
|
||||||
|
|
|
@ -24,19 +24,26 @@ import (
|
||||||
|
|
||||||
// PrmObjectSearch groups parameters of ObjectSearch operation.
|
// PrmObjectSearch groups parameters of ObjectSearch operation.
|
||||||
type PrmObjectSearch struct {
|
type PrmObjectSearch struct {
|
||||||
meta v2session.RequestMetaHeader
|
XHeaders []string
|
||||||
|
|
||||||
key *ecdsa.PrivateKey
|
Local bool
|
||||||
|
|
||||||
cnrSet bool
|
BearerToken *bearer.Token
|
||||||
cnrID cid.ID
|
|
||||||
|
|
||||||
filters object.SearchFilters
|
Session *session.Object
|
||||||
|
|
||||||
|
ContainerID *cid.ID
|
||||||
|
|
||||||
|
Key *ecdsa.PrivateKey
|
||||||
|
|
||||||
|
Filters object.SearchFilters
|
||||||
}
|
}
|
||||||
|
|
||||||
// MarkLocal tells the server to execute the operation locally.
|
// MarkLocal tells the server to execute the operation locally.
|
||||||
|
//
|
||||||
|
// Deprecated: Use PrmObjectSearch.Local instead.
|
||||||
func (x *PrmObjectSearch) MarkLocal() {
|
func (x *PrmObjectSearch) MarkLocal() {
|
||||||
x.meta.SetTTL(1)
|
x.Local = true
|
||||||
}
|
}
|
||||||
|
|
||||||
// WithinSession specifies session within which the search query must be executed.
|
// WithinSession specifies session within which the search query must be executed.
|
||||||
|
@ -45,10 +52,10 @@ func (x *PrmObjectSearch) MarkLocal() {
|
||||||
// This may affect the execution of an operation (e.g. access control).
|
// This may affect the execution of an operation (e.g. access control).
|
||||||
//
|
//
|
||||||
// Must be signed.
|
// Must be signed.
|
||||||
|
//
|
||||||
|
// Deprecated: Use PrmObjectSearch.Session instead.
|
||||||
func (x *PrmObjectSearch) WithinSession(t session.Object) {
|
func (x *PrmObjectSearch) WithinSession(t session.Object) {
|
||||||
var tokv2 v2session.Token
|
x.Session = &t
|
||||||
t.WriteToV2(&tokv2)
|
|
||||||
x.meta.SetSessionToken(&tokv2)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// WithBearerToken attaches bearer token to be used for the operation.
|
// WithBearerToken attaches bearer token to be used for the operation.
|
||||||
|
@ -56,37 +63,44 @@ func (x *PrmObjectSearch) WithinSession(t session.Object) {
|
||||||
// If set, underlying eACL rules will be used in access control.
|
// If set, underlying eACL rules will be used in access control.
|
||||||
//
|
//
|
||||||
// Must be signed.
|
// Must be signed.
|
||||||
|
//
|
||||||
|
// Deprecated: Use PrmObjectSearch.BearerToken instead.
|
||||||
func (x *PrmObjectSearch) WithBearerToken(t bearer.Token) {
|
func (x *PrmObjectSearch) WithBearerToken(t bearer.Token) {
|
||||||
var v2token acl.BearerToken
|
x.BearerToken = &t
|
||||||
t.WriteToV2(&v2token)
|
|
||||||
x.meta.SetBearerToken(&v2token)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// WithXHeaders specifies list of extended headers (string key-value pairs)
|
// WithXHeaders specifies list of extended headers (string key-value pairs)
|
||||||
// to be attached to the request. Must have an even length.
|
// to be attached to the request. Must have an even length.
|
||||||
//
|
//
|
||||||
// Slice must not be mutated until the operation completes.
|
// Slice must not be mutated until the operation completes.
|
||||||
|
//
|
||||||
|
// Deprecated: Use PrmObjectSearch.XHeaders instead.
|
||||||
func (x *PrmObjectSearch) WithXHeaders(hs ...string) {
|
func (x *PrmObjectSearch) WithXHeaders(hs ...string) {
|
||||||
writeXHeadersToMeta(hs, &x.meta)
|
x.XHeaders = hs
|
||||||
}
|
}
|
||||||
|
|
||||||
// UseKey specifies private key to sign the requests.
|
// UseKey specifies private key to sign the requests.
|
||||||
// If key is not provided, then Client default key is used.
|
// If key is not provided, then Client default key is used.
|
||||||
|
//
|
||||||
|
// Deprecated: Use PrmObjectSearch.Key instead.
|
||||||
func (x *PrmObjectSearch) UseKey(key ecdsa.PrivateKey) {
|
func (x *PrmObjectSearch) UseKey(key ecdsa.PrivateKey) {
|
||||||
x.key = &key
|
x.Key = &key
|
||||||
}
|
}
|
||||||
|
|
||||||
// InContainer specifies the container in which to look for objects.
|
// InContainer specifies the container in which to look for objects.
|
||||||
// Required parameter.
|
// Required parameter.
|
||||||
|
//
|
||||||
|
// Deprecated: Use PrmObjectSearch.ContainerID instead.
|
||||||
func (x *PrmObjectSearch) InContainer(id cid.ID) {
|
func (x *PrmObjectSearch) InContainer(id cid.ID) {
|
||||||
x.cnrID = id
|
x.ContainerID = &id
|
||||||
x.cnrSet = true
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// SetFilters sets filters by which to select objects. All container objects
|
// SetFilters sets filters by which to select objects. All container objects
|
||||||
// match unset/empty filters.
|
// match unset/empty filters.
|
||||||
|
//
|
||||||
|
// Deprecated: Use PrmObjectSearch.Filters instead.
|
||||||
func (x *PrmObjectSearch) SetFilters(filters object.SearchFilters) {
|
func (x *PrmObjectSearch) SetFilters(filters object.SearchFilters) {
|
||||||
x.filters = filters
|
x.Filters = filters
|
||||||
}
|
}
|
||||||
|
|
||||||
// ResObjectSearch groups the final result values of ObjectSearch operation.
|
// ResObjectSearch groups the final result values of ObjectSearch operation.
|
||||||
|
@ -212,6 +226,48 @@ func (x *ObjectListReader) Close() (*ResObjectSearch, error) {
|
||||||
return &x.res, nil
|
return &x.res, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (x *PrmObjectSearch) buildRequest(c *Client) (*v2object.SearchRequest, error) {
|
||||||
|
if x.ContainerID == nil {
|
||||||
|
return nil, errorMissingContainer
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(x.XHeaders)%2 != 0 {
|
||||||
|
return nil, errorInvalidXHeaders
|
||||||
|
}
|
||||||
|
|
||||||
|
meta := new(v2session.RequestMetaHeader)
|
||||||
|
writeXHeadersToMeta(x.XHeaders, meta)
|
||||||
|
|
||||||
|
if x.BearerToken != nil {
|
||||||
|
v2BearerToken := new(acl.BearerToken)
|
||||||
|
x.BearerToken.WriteToV2(v2BearerToken)
|
||||||
|
meta.SetBearerToken(v2BearerToken)
|
||||||
|
}
|
||||||
|
|
||||||
|
if x.Session != nil {
|
||||||
|
v2SessionToken := new(v2session.Token)
|
||||||
|
x.Session.WriteToV2(v2SessionToken)
|
||||||
|
meta.SetSessionToken(v2SessionToken)
|
||||||
|
}
|
||||||
|
|
||||||
|
if x.Local {
|
||||||
|
meta.SetTTL(1)
|
||||||
|
}
|
||||||
|
cnrV2 := new(v2refs.ContainerID)
|
||||||
|
x.ContainerID.WriteToV2(cnrV2)
|
||||||
|
|
||||||
|
body := new(v2object.SearchRequestBody)
|
||||||
|
body.SetVersion(1)
|
||||||
|
body.SetContainerID(cnrV2)
|
||||||
|
body.SetFilters(x.Filters.ToV2())
|
||||||
|
|
||||||
|
req := new(v2object.SearchRequest)
|
||||||
|
req.SetBody(body)
|
||||||
|
c.prepareRequest(req, meta)
|
||||||
|
|
||||||
|
return req, nil
|
||||||
|
}
|
||||||
|
|
||||||
// ObjectSearchInit initiates object selection through a remote server using FrostFS API protocol.
|
// ObjectSearchInit initiates object selection through a remote server using FrostFS API protocol.
|
||||||
//
|
//
|
||||||
// The call only opens the transmission channel, explicit fetching of matched objects
|
// The call only opens the transmission channel, explicit fetching of matched objects
|
||||||
|
@ -221,30 +277,17 @@ func (x *ObjectListReader) Close() (*ResObjectSearch, error) {
|
||||||
// Returns an error if parameters are set incorrectly (see PrmObjectSearch docs).
|
// Returns an error if parameters are set incorrectly (see PrmObjectSearch docs).
|
||||||
// Context is required and must not be nil. It is used for network communication.
|
// Context is required and must not be nil. It is used for network communication.
|
||||||
func (c *Client) ObjectSearchInit(ctx context.Context, prm PrmObjectSearch) (*ObjectListReader, error) {
|
func (c *Client) ObjectSearchInit(ctx context.Context, prm PrmObjectSearch) (*ObjectListReader, error) {
|
||||||
// check parameters
|
req, err := prm.buildRequest(c)
|
||||||
if !prm.cnrSet {
|
if err != nil {
|
||||||
return nil, errorMissingContainer
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
var cidV2 v2refs.ContainerID
|
key := prm.Key
|
||||||
prm.cnrID.WriteToV2(&cidV2)
|
|
||||||
|
|
||||||
var body v2object.SearchRequestBody
|
|
||||||
body.SetVersion(1)
|
|
||||||
body.SetContainerID(&cidV2)
|
|
||||||
body.SetFilters(prm.filters.ToV2())
|
|
||||||
|
|
||||||
// init reader
|
|
||||||
var req v2object.SearchRequest
|
|
||||||
req.SetBody(&body)
|
|
||||||
c.prepareRequest(&req, &prm.meta)
|
|
||||||
|
|
||||||
key := prm.key
|
|
||||||
if key == nil {
|
if key == nil {
|
||||||
key = &c.prm.key
|
key = &c.prm.key
|
||||||
}
|
}
|
||||||
|
|
||||||
err := signature.SignServiceMessage(key, &req)
|
err = signature.SignServiceMessage(key, req)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("sign request: %w", err)
|
return nil, fmt.Errorf("sign request: %w", err)
|
||||||
}
|
}
|
||||||
|
@ -252,7 +295,7 @@ func (c *Client) ObjectSearchInit(ctx context.Context, prm PrmObjectSearch) (*Ob
|
||||||
var r ObjectListReader
|
var r ObjectListReader
|
||||||
ctx, r.cancelCtxStream = context.WithCancel(ctx)
|
ctx, r.cancelCtxStream = context.WithCancel(ctx)
|
||||||
|
|
||||||
r.stream, err = rpcapi.SearchObjects(&c.c, &req, client.WithContext(ctx))
|
r.stream, err = rpcapi.SearchObjects(&c.c, req, client.WithContext(ctx))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("open stream: %w", err)
|
return nil, fmt.Errorf("open stream: %w", err)
|
||||||
}
|
}
|
||||||
|
|
|
@ -89,9 +89,9 @@ func (x ResSessionCreate) PublicKey() []byte {
|
||||||
//
|
//
|
||||||
// Exactly one return value is non-nil. By default, server status is returned in res structure.
|
// Exactly one return value is non-nil. By default, server status is returned in res structure.
|
||||||
// Any client's internal or transport errors are returned as `error`.
|
// Any client's internal or transport errors are returned as `error`.
|
||||||
// If PrmInit.ResolveFrostFSFailures has been called, unsuccessful
|
// If PrmInit.DisableFrostFSFailuresResolution has been called, unsuccessful
|
||||||
// FrostFS status codes are returned as `error`, otherwise, are included
|
// FrostFS status codes are included in the returned result structure,
|
||||||
// in the returned result structure.
|
// otherwise, are also returned as `error`.
|
||||||
//
|
//
|
||||||
// Returns an error if parameters are set incorrectly (see PrmSessionCreate docs).
|
// Returns an error if parameters are set incorrectly (see PrmSessionCreate docs).
|
||||||
// Context is required and must not be nil. It is used for network communication.
|
// Context is required and must not be nil. It is used for network communication.
|
||||||
|
|
|
@ -156,11 +156,7 @@ func (a *meanIQRAgg) Compute() float64 {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *reverseMinNorm) Normalize(w float64) float64 {
|
func (r *reverseMinNorm) Normalize(w float64) float64 {
|
||||||
if w == 0 {
|
return (r.min + 1) / (w + 1)
|
||||||
return 0
|
|
||||||
}
|
|
||||||
|
|
||||||
return r.min / w
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *sigmoidNorm) Normalize(w float64) float64 {
|
func (r *sigmoidNorm) Normalize(w float64) float64 {
|
||||||
|
|
|
@ -146,19 +146,19 @@
|
||||||
"select 3 nodes in 3 distinct countries, same placement": {
|
"select 3 nodes in 3 distinct countries, same placement": {
|
||||||
"policy": {"replicas":[{"count":1,"selector":"Main"}],"containerBackupFactor":1,"selectors":[{"name":"Main","count":3,"clause":"DISTINCT","attribute":"Country","filter":"*"}],"filters":[]},
|
"policy": {"replicas":[{"count":1,"selector":"Main"}],"containerBackupFactor":1,"selectors":[{"name":"Main","count":3,"clause":"DISTINCT","attribute":"Country","filter":"*"}],"filters":[]},
|
||||||
"pivot": "Y29udGFpbmVySUQ=",
|
"pivot": "Y29udGFpbmVySUQ=",
|
||||||
"result": [[0, 2, 3]],
|
"result": [ [ 5, 0, 7 ] ],
|
||||||
"placement": {
|
"placement": {
|
||||||
"pivot": "b2JqZWN0SUQ=",
|
"pivot": "b2JqZWN0SUQ=",
|
||||||
"result": [[0, 2, 3]]
|
"result": [ [ 5, 0, 7 ] ]
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
"select 6 nodes in 3 distinct countries, different placement": {
|
"select 6 nodes in 3 distinct countries, different placement": {
|
||||||
"policy": {"replicas":[{"count":1,"selector":"Main"}],"containerBackupFactor":2,"selectors":[{"name":"Main","count":3,"clause":"DISTINCT","attribute":"Country","filter":"*"}],"filters":[]},
|
"policy": {"replicas":[{"count":1,"selector":"Main"}],"containerBackupFactor":2,"selectors":[{"name":"Main","count":3,"clause":"DISTINCT","attribute":"Country","filter":"*"}],"filters":[]},
|
||||||
"pivot": "Y29udGFpbmVySUQ=",
|
"pivot": "Y29udGFpbmVySUQ=",
|
||||||
"result": [[0, 1, 2, 6, 3, 4]],
|
"result": [ [ 5, 4, 0, 1, 7, 2 ] ],
|
||||||
"placement": {
|
"placement": {
|
||||||
"pivot": "b2JqZWN0SUQ=",
|
"pivot": "b2JqZWN0SUQ=",
|
||||||
"result": [[0, 1, 2, 6, 3, 4]]
|
"result": [ [ 5, 4, 0, 7, 2, 1 ] ]
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -455,7 +455,7 @@ func (p PlacementPolicy) WriteStringTo(w io.StringWriter) (err error) {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
err = writeFilterStringTo(w, p.filters[i])
|
err = writeFilterStringTo(w, p.filters[i], false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -464,7 +464,7 @@ func (p PlacementPolicy) WriteStringTo(w io.StringWriter) (err error) {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func writeFilterStringTo(w io.StringWriter, f netmap.Filter) error {
|
func writeFilterStringTo(w io.StringWriter, f netmap.Filter, mayNeedOuterBrackets bool) error {
|
||||||
var err error
|
var err error
|
||||||
var s string
|
var s string
|
||||||
op := f.GetOp()
|
op := f.GetOp()
|
||||||
|
@ -489,7 +489,7 @@ func writeFilterStringTo(w io.StringWriter, f netmap.Filter) error {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
err = writeFilterStringTo(w, inner[0])
|
err = writeFilterStringTo(w, inner[0], false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -498,6 +498,13 @@ func writeFilterStringTo(w io.StringWriter, f netmap.Filter) error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
useBrackets := mayNeedOuterBrackets && op == netmap.OR && len(inner) > 1
|
||||||
|
if useBrackets {
|
||||||
|
_, err = w.WriteString("(")
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
for i := range inner {
|
for i := range inner {
|
||||||
if i != 0 {
|
if i != 0 {
|
||||||
_, err = w.WriteString(" " + op.String() + " ")
|
_, err = w.WriteString(" " + op.String() + " ")
|
||||||
|
@ -505,7 +512,13 @@ func writeFilterStringTo(w io.StringWriter, f netmap.Filter) error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
err = writeFilterStringTo(w, inner[i])
|
err = writeFilterStringTo(w, inner[i], true)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if useBrackets {
|
||||||
|
_, err = w.WriteString(")")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
package netmap_test
|
package netmap_test
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"strings"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
. "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
|
. "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
|
||||||
|
@ -29,6 +30,79 @@ func TestPlacementPolicyEncoding(t *testing.T) {
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestPlacementPolicyWriteString(t *testing.T) {
|
||||||
|
var testCases = []struct {
|
||||||
|
name string
|
||||||
|
input string
|
||||||
|
output string // If the output is empty, make it equal to input.
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "no compound operators",
|
||||||
|
input: `REP 1
|
||||||
|
CBF 1
|
||||||
|
SELECT 1 FROM Color
|
||||||
|
FILTER Color EQ Red AS Color`,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "no brackets in single level same-operator chain",
|
||||||
|
input: `REP 1
|
||||||
|
CBF 1
|
||||||
|
SELECT 1 FROM Color
|
||||||
|
FILTER Color EQ Red OR Color EQ Blue OR Color EQ Green AS Color`,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "no brackets aroung higher precedence op",
|
||||||
|
input: `REP 1
|
||||||
|
CBF 1
|
||||||
|
SELECT 1 FROM Color
|
||||||
|
FILTER Color EQ Red OR Color EQ Blue AND Color NE Green AS Color`,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "no brackets aroung higher precedence op, even if present in the input",
|
||||||
|
input: `REP 1
|
||||||
|
CBF 1
|
||||||
|
SELECT 1 FROM Color
|
||||||
|
FILTER Color EQ Red OR (Color EQ Blue AND Color NE Green) AS Color`,
|
||||||
|
output: `REP 1
|
||||||
|
CBF 1
|
||||||
|
SELECT 1 FROM Color
|
||||||
|
FILTER Color EQ Red OR Color EQ Blue AND Color NE Green AS Color`,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "brackets aroung lower precedence op",
|
||||||
|
input: `REP 1
|
||||||
|
CBF 1
|
||||||
|
SELECT 1 FROM Color
|
||||||
|
FILTER (Color EQ Red OR Color EQ Blue) AND Color NE Green AS Color`,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "no extra brackets for bracketed same-operator chain",
|
||||||
|
input: `REP 1
|
||||||
|
CBF 1
|
||||||
|
SELECT 1 FROM Color
|
||||||
|
FILTER (Color EQ Red OR Color EQ Blue OR Color EQ Yellow) AND Color NE Green AS Color`,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, tc := range testCases {
|
||||||
|
var p PlacementPolicy
|
||||||
|
require.NoError(t, p.DecodeString(tc.input))
|
||||||
|
|
||||||
|
var sb strings.Builder
|
||||||
|
require.NoError(t, p.WriteStringTo(&sb))
|
||||||
|
|
||||||
|
if tc.output == "" {
|
||||||
|
require.Equal(t, tc.input, sb.String())
|
||||||
|
} else {
|
||||||
|
require.Equal(t, tc.output, sb.String())
|
||||||
|
|
||||||
|
var p1 PlacementPolicy
|
||||||
|
require.NoError(t, p1.DecodeString(tc.output))
|
||||||
|
require.Equal(t, p, p1)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestDecodeSelectFilterExpr(t *testing.T) {
|
func TestDecodeSelectFilterExpr(t *testing.T) {
|
||||||
for _, s := range []string{
|
for _, s := range []string{
|
||||||
"SELECT 1 FROM *",
|
"SELECT 1 FROM *",
|
||||||
|
|
|
@ -18,8 +18,8 @@ func TestChannelTarget(t *testing.T) {
|
||||||
tt := new(testTarget)
|
tt := new(testTarget)
|
||||||
ct := NewChannelTarget(ch)
|
ct := NewChannelTarget(ch)
|
||||||
|
|
||||||
chTarget, _ := newPayloadSizeLimiter(maxSize, func() ObjectWriter { return ct })
|
chTarget, _ := newPayloadSizeLimiter(maxSize, 0, func() ObjectWriter { return ct })
|
||||||
testTarget, _ := newPayloadSizeLimiter(maxSize, func() ObjectWriter { return tt })
|
testTarget, _ := newPayloadSizeLimiter(maxSize, 0, func() ObjectWriter { return tt })
|
||||||
|
|
||||||
ver := version.Current()
|
ver := version.Current()
|
||||||
cnr := cidtest.ID()
|
cnr := cidtest.ID()
|
||||||
|
|
72
object/transformer/size_hint_test.go
Normal file
72
object/transformer/size_hint_test.go
Normal file
|
@ -0,0 +1,72 @@
|
||||||
|
package transformer
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"crypto/rand"
|
||||||
|
"math"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
|
||||||
|
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
|
||||||
|
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestTransformerSizeHintCorrectness(t *testing.T) {
|
||||||
|
const (
|
||||||
|
maxSize = 100
|
||||||
|
payloadSize = maxSize*2 + maxSize/2
|
||||||
|
)
|
||||||
|
|
||||||
|
pk, err := keys.NewPrivateKey()
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
p := Params{
|
||||||
|
Key: &pk.PrivateKey,
|
||||||
|
NetworkState: dummyEpochSource(123),
|
||||||
|
MaxSize: maxSize,
|
||||||
|
WithoutHomomorphicHash: true,
|
||||||
|
}
|
||||||
|
|
||||||
|
cnr := cidtest.ID()
|
||||||
|
hdr := newObject(cnr)
|
||||||
|
|
||||||
|
var owner user.ID
|
||||||
|
user.IDFromKey(&owner, pk.PrivateKey.PublicKey)
|
||||||
|
hdr.SetOwnerID(&owner)
|
||||||
|
|
||||||
|
expected := make([]byte, payloadSize)
|
||||||
|
_, _ = rand.Read(expected)
|
||||||
|
|
||||||
|
t.Run("default", func(t *testing.T) {
|
||||||
|
p.SizeHint = 0
|
||||||
|
testPayloadEqual(t, p, hdr, expected)
|
||||||
|
})
|
||||||
|
t.Run("size hint is perfect", func(t *testing.T) {
|
||||||
|
p.SizeHint = payloadSize
|
||||||
|
testPayloadEqual(t, p, hdr, expected)
|
||||||
|
})
|
||||||
|
t.Run("size hint < payload size", func(t *testing.T) {
|
||||||
|
p.SizeHint = payloadSize / 2
|
||||||
|
testPayloadEqual(t, p, hdr, expected)
|
||||||
|
})
|
||||||
|
t.Run("size hint > payload size", func(t *testing.T) {
|
||||||
|
p.SizeHint = math.MaxUint64
|
||||||
|
testPayloadEqual(t, p, hdr, expected)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func testPayloadEqual(t *testing.T, p Params, hdr *objectSDK.Object, expected []byte) {
|
||||||
|
tt := new(testTarget)
|
||||||
|
|
||||||
|
p.NextTargetInit = func() ObjectWriter { return tt }
|
||||||
|
target := NewPayloadSizeLimiter(p)
|
||||||
|
|
||||||
|
writeObject(t, context.Background(), target, hdr, expected)
|
||||||
|
var actual []byte
|
||||||
|
for i := range tt.objects {
|
||||||
|
actual = append(actual, tt.objects[i].Payload()...)
|
||||||
|
}
|
||||||
|
require.Equal(t, expected, actual)
|
||||||
|
}
|
|
@ -40,6 +40,11 @@ type Params struct {
|
||||||
NetworkState EpochSource
|
NetworkState EpochSource
|
||||||
MaxSize uint64
|
MaxSize uint64
|
||||||
WithoutHomomorphicHash bool
|
WithoutHomomorphicHash bool
|
||||||
|
// SizeHint is a hint for the total payload size to be processed.
|
||||||
|
// It is used primarily to optimize allocations and doesn't affect
|
||||||
|
// functionality. Primary usecases are providing file size when putting an object
|
||||||
|
// with the frostfs-cli or using Content-Length header in gateways.
|
||||||
|
SizeHint uint64
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewPayloadSizeLimiter returns ObjectTarget instance that restricts payload length
|
// NewPayloadSizeLimiter returns ObjectTarget instance that restricts payload length
|
||||||
|
@ -121,7 +126,18 @@ func (s *payloadSizeLimiter) initializeCurrent() {
|
||||||
s.nextTarget = s.NextTargetInit()
|
s.nextTarget = s.NextTargetInit()
|
||||||
s.writtenCurrent = 0
|
s.writtenCurrent = 0
|
||||||
s.initPayloadHashers()
|
s.initPayloadHashers()
|
||||||
s.payload = make([]byte, 0)
|
|
||||||
|
var payloadSize uint64
|
||||||
|
|
||||||
|
// Check whether SizeHint is valid.
|
||||||
|
if remaining := s.SizeHint - s.written; remaining <= s.SizeHint {
|
||||||
|
if remaining >= s.MaxSize {
|
||||||
|
payloadSize = s.MaxSize
|
||||||
|
} else {
|
||||||
|
payloadSize = remaining % s.MaxSize
|
||||||
|
}
|
||||||
|
}
|
||||||
|
s.payload = make([]byte, 0, payloadSize)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *payloadSizeLimiter) initPayloadHashers() {
|
func (s *payloadSizeLimiter) initPayloadHashers() {
|
||||||
|
|
|
@ -20,7 +20,7 @@ func TestTransformer(t *testing.T) {
|
||||||
|
|
||||||
tt := new(testTarget)
|
tt := new(testTarget)
|
||||||
|
|
||||||
target, pk := newPayloadSizeLimiter(maxSize, func() ObjectWriter { return tt })
|
target, pk := newPayloadSizeLimiter(maxSize, 0, func() ObjectWriter { return tt })
|
||||||
|
|
||||||
cnr := cidtest.ID()
|
cnr := cidtest.ID()
|
||||||
hdr := newObject(cnr)
|
hdr := newObject(cnr)
|
||||||
|
@ -114,15 +114,37 @@ func writeObject(t *testing.T, ctx context.Context, target ChunkedObjectWriter,
|
||||||
func BenchmarkTransformer(b *testing.B) {
|
func BenchmarkTransformer(b *testing.B) {
|
||||||
hdr := newObject(cidtest.ID())
|
hdr := newObject(cidtest.ID())
|
||||||
|
|
||||||
|
const (
|
||||||
|
// bufferSize is taken from https://git.frostfs.info/TrueCloudLab/frostfs-sdk-go/src/commit/670619d2426fee233a37efe21a0471989b16a4fc/pool/pool.go#L1825
|
||||||
|
bufferSize = 3 * 1024 * 1024
|
||||||
|
smallSize = 8 * 1024
|
||||||
|
bigSize = 64 * 1024 * 1024 * 9 / 2 // 4.5 parts
|
||||||
|
)
|
||||||
b.Run("small", func(b *testing.B) {
|
b.Run("small", func(b *testing.B) {
|
||||||
benchmarkTransformer(b, hdr, 8*1024)
|
b.Run("no size hint", func(b *testing.B) {
|
||||||
|
benchmarkTransformer(b, hdr, smallSize, 0, 0)
|
||||||
|
})
|
||||||
|
b.Run("no size hint, with buffer", func(b *testing.B) {
|
||||||
|
benchmarkTransformer(b, hdr, smallSize, 0, bufferSize)
|
||||||
|
})
|
||||||
|
b.Run("with size hint, with buffer", func(b *testing.B) {
|
||||||
|
benchmarkTransformer(b, hdr, smallSize, smallSize, bufferSize)
|
||||||
|
})
|
||||||
})
|
})
|
||||||
b.Run("big", func(b *testing.B) {
|
b.Run("big", func(b *testing.B) {
|
||||||
benchmarkTransformer(b, hdr, 64*1024*1024*9/2)
|
b.Run("no size hint", func(b *testing.B) {
|
||||||
|
benchmarkTransformer(b, hdr, bigSize, 0, 0)
|
||||||
|
})
|
||||||
|
b.Run("no size hint, with buffer", func(b *testing.B) {
|
||||||
|
benchmarkTransformer(b, hdr, bigSize, 0, bufferSize)
|
||||||
|
})
|
||||||
|
b.Run("with size hint, with buffer", func(b *testing.B) {
|
||||||
|
benchmarkTransformer(b, hdr, bigSize, bigSize, bufferSize)
|
||||||
|
})
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func benchmarkTransformer(b *testing.B, header *objectSDK.Object, payloadSize int) {
|
func benchmarkTransformer(b *testing.B, header *objectSDK.Object, payloadSize, sizeHint, bufferSize int) {
|
||||||
const maxSize = 64 * 1024 * 1024
|
const maxSize = 64 * 1024 * 1024
|
||||||
|
|
||||||
payload := make([]byte, payloadSize)
|
payload := make([]byte, payloadSize)
|
||||||
|
@ -131,12 +153,24 @@ func benchmarkTransformer(b *testing.B, header *objectSDK.Object, payloadSize in
|
||||||
b.ReportAllocs()
|
b.ReportAllocs()
|
||||||
b.ResetTimer()
|
b.ResetTimer()
|
||||||
for i := 0; i < b.N; i++ {
|
for i := 0; i < b.N; i++ {
|
||||||
f, _ := newPayloadSizeLimiter(maxSize, func() ObjectWriter { return benchTarget{} })
|
f, _ := newPayloadSizeLimiter(maxSize, uint64(sizeHint), func() ObjectWriter { return benchTarget{} })
|
||||||
if err := f.WriteHeader(ctx, header); err != nil {
|
if err := f.WriteHeader(ctx, header); err != nil {
|
||||||
b.Fatalf("write header: %v", err)
|
b.Fatalf("write header: %v", err)
|
||||||
}
|
}
|
||||||
if _, err := f.Write(ctx, payload); err != nil {
|
if bufferSize == 0 {
|
||||||
b.Fatalf("write: %v", err)
|
if _, err := f.Write(ctx, payload); err != nil {
|
||||||
|
b.Fatalf("write: %v", err)
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
j := 0
|
||||||
|
for ; j+bufferSize < payloadSize; j += bufferSize {
|
||||||
|
if _, err := f.Write(ctx, payload[j:j+bufferSize]); err != nil {
|
||||||
|
b.Fatalf("write: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if _, err := f.Write(ctx, payload[j:payloadSize]); err != nil {
|
||||||
|
b.Fatalf("write: %v", err)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
if _, err := f.Close(ctx); err != nil {
|
if _, err := f.Close(ctx); err != nil {
|
||||||
b.Fatalf("close: %v", err)
|
b.Fatalf("close: %v", err)
|
||||||
|
@ -144,7 +178,7 @@ func benchmarkTransformer(b *testing.B, header *objectSDK.Object, payloadSize in
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func newPayloadSizeLimiter(maxSize uint64, nextTarget TargetInitializer) (ChunkedObjectWriter, *keys.PrivateKey) {
|
func newPayloadSizeLimiter(maxSize uint64, sizeHint uint64, nextTarget TargetInitializer) (ChunkedObjectWriter, *keys.PrivateKey) {
|
||||||
p, err := keys.NewPrivateKey()
|
p, err := keys.NewPrivateKey()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
panic(err)
|
panic(err)
|
||||||
|
@ -155,6 +189,7 @@ func newPayloadSizeLimiter(maxSize uint64, nextTarget TargetInitializer) (Chunke
|
||||||
NextTargetInit: nextTarget,
|
NextTargetInit: nextTarget,
|
||||||
NetworkState: dummyEpochSource(123),
|
NetworkState: dummyEpochSource(123),
|
||||||
MaxSize: maxSize,
|
MaxSize: maxSize,
|
||||||
|
SizeHint: sizeHint,
|
||||||
WithoutHomomorphicHash: true,
|
WithoutHomomorphicHash: true,
|
||||||
}), p
|
}), p
|
||||||
}
|
}
|
||||||
|
|
|
@ -110,16 +110,11 @@ func (it *internalTarget) WriteObject(ctx context.Context, o *object.Object) err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (it *internalTarget) putAsStream(ctx context.Context, o *object.Object) error {
|
func (it *internalTarget) putAsStream(ctx context.Context, o *object.Object) error {
|
||||||
var cliPrm sdkClient.PrmObjectPutInit
|
cliPrm := sdkClient.PrmObjectPutInit{
|
||||||
cliPrm.SetCopiesNumberByVectors(it.prm.copiesNumber)
|
CopiesNumber: it.prm.copiesNumber,
|
||||||
if it.prm.stoken != nil {
|
Session: it.prm.stoken,
|
||||||
cliPrm.WithinSession(*it.prm.stoken)
|
Key: it.prm.key,
|
||||||
}
|
BearerToken: it.prm.btoken,
|
||||||
if it.prm.key != nil {
|
|
||||||
cliPrm.UseKey(*it.prm.key)
|
|
||||||
}
|
|
||||||
if it.prm.btoken != nil {
|
|
||||||
cliPrm.WithBearerToken(*it.prm.btoken)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
wrt, err := it.client.ObjectPutInit(ctx, cliPrm)
|
wrt, err := it.client.ObjectPutInit(ctx, cliPrm)
|
||||||
|
@ -141,16 +136,13 @@ func (it *internalTarget) tryPutSingle(ctx context.Context, o *object.Object) (b
|
||||||
if it.useStream {
|
if it.useStream {
|
||||||
return false, nil
|
return false, nil
|
||||||
}
|
}
|
||||||
var cliPrm sdkClient.PrmObjectPutSingle
|
cliPrm := sdkClient.PrmObjectPutSingle{
|
||||||
cliPrm.SetCopiesNumber(it.prm.copiesNumber)
|
CopiesNumber: it.prm.copiesNumber,
|
||||||
cliPrm.UseKey(it.prm.key)
|
Key: it.prm.key,
|
||||||
if it.prm.stoken != nil {
|
Session: it.prm.stoken,
|
||||||
cliPrm.WithinSession(*it.prm.stoken)
|
BearerToken: it.prm.btoken,
|
||||||
|
Object: o,
|
||||||
}
|
}
|
||||||
if it.prm.btoken != nil {
|
|
||||||
cliPrm.WithBearerToken(*it.prm.btoken)
|
|
||||||
}
|
|
||||||
cliPrm.SetObject(o.ToV2())
|
|
||||||
|
|
||||||
res, err := it.client.ObjectPutSingle(ctx, cliPrm)
|
res, err := it.client.ObjectPutSingle(ctx, cliPrm)
|
||||||
if err != nil && status.Code(err) == codes.Unimplemented {
|
if err != nil && status.Code(err) == codes.Unimplemented {
|
||||||
|
|
38
pool/pool.go
38
pool/pool.go
|
@ -9,7 +9,6 @@ import (
|
||||||
"io"
|
"io"
|
||||||
"math"
|
"math"
|
||||||
"math/rand"
|
"math/rand"
|
||||||
"runtime/trace"
|
|
||||||
"sort"
|
"sort"
|
||||||
"sync"
|
"sync"
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
|
@ -418,8 +417,9 @@ func (c *clientWrapper) balanceGet(ctx context.Context, prm PrmBalanceGet) (acco
|
||||||
return accounting.Decimal{}, err
|
return accounting.Decimal{}, err
|
||||||
}
|
}
|
||||||
|
|
||||||
var cliPrm sdkClient.PrmBalanceGet
|
cliPrm := sdkClient.PrmBalanceGet{
|
||||||
cliPrm.SetAccount(prm.account)
|
Account: &prm.account,
|
||||||
|
}
|
||||||
|
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
res, err := cl.BalanceGet(ctx, cliPrm)
|
res, err := cl.BalanceGet(ctx, cliPrm)
|
||||||
|
@ -503,8 +503,9 @@ func (c *clientWrapper) containerList(ctx context.Context, prm PrmContainerList)
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
var cliPrm sdkClient.PrmContainerList
|
cliPrm := sdkClient.PrmContainerList{
|
||||||
cliPrm.SetAccount(prm.ownerID)
|
Account: &prm.ownerID,
|
||||||
|
}
|
||||||
|
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
res, err := cl.ContainerList(ctx, cliPrm)
|
res, err := cl.ContainerList(ctx, cliPrm)
|
||||||
|
@ -687,9 +688,6 @@ func (c *clientWrapper) netMapSnapshot(ctx context.Context, _ prmNetMapSnapshot)
|
||||||
|
|
||||||
// objectPut writes object to FrostFS.
|
// objectPut writes object to FrostFS.
|
||||||
func (c *clientWrapper) objectPut(ctx context.Context, prm PrmObjectPut) (oid.ID, error) {
|
func (c *clientWrapper) objectPut(ctx context.Context, prm PrmObjectPut) (oid.ID, error) {
|
||||||
ctx, task := trace.NewTask(ctx, "obj.put")
|
|
||||||
defer task.End()
|
|
||||||
|
|
||||||
if prm.bufferMaxSize == 0 {
|
if prm.bufferMaxSize == 0 {
|
||||||
prm.bufferMaxSize = defaultBufferMaxSizeForPut
|
prm.bufferMaxSize = defaultBufferMaxSizeForPut
|
||||||
}
|
}
|
||||||
|
@ -876,9 +874,6 @@ func (c *clientWrapper) objectDelete(ctx context.Context, prm PrmObjectDelete) e
|
||||||
|
|
||||||
// objectGet returns reader for object.
|
// objectGet returns reader for object.
|
||||||
func (c *clientWrapper) objectGet(ctx context.Context, prm PrmObjectGet) (ResGetObject, error) {
|
func (c *clientWrapper) objectGet(ctx context.Context, prm PrmObjectGet) (ResGetObject, error) {
|
||||||
ctx, task := trace.NewTask(ctx, "obj.getinit")
|
|
||||||
defer task.End()
|
|
||||||
|
|
||||||
cl, err := c.getClient()
|
cl, err := c.getClient()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return ResGetObject{}, err
|
return ResGetObject{}, err
|
||||||
|
@ -1005,21 +1000,12 @@ func (c *clientWrapper) objectSearch(ctx context.Context, prm PrmObjectSearch) (
|
||||||
return ResObjectSearch{}, err
|
return ResObjectSearch{}, err
|
||||||
}
|
}
|
||||||
|
|
||||||
var cliPrm sdkClient.PrmObjectSearch
|
cliPrm := sdkClient.PrmObjectSearch{
|
||||||
|
ContainerID: &prm.cnrID,
|
||||||
cliPrm.InContainer(prm.cnrID)
|
Filters: prm.filters,
|
||||||
cliPrm.SetFilters(prm.filters)
|
Session: prm.stoken,
|
||||||
|
BearerToken: prm.btoken,
|
||||||
if prm.stoken != nil {
|
Key: prm.key,
|
||||||
cliPrm.WithinSession(*prm.stoken)
|
|
||||||
}
|
|
||||||
|
|
||||||
if prm.btoken != nil {
|
|
||||||
cliPrm.WithBearerToken(*prm.btoken)
|
|
||||||
}
|
|
||||||
|
|
||||||
if prm.key != nil {
|
|
||||||
cliPrm.UseKey(*prm.key)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
res, err := cl.ObjectSearchInit(ctx, cliPrm)
|
res, err := cl.ObjectSearchInit(ctx, cliPrm)
|
||||||
|
|
|
@ -5,7 +5,6 @@ import (
|
||||||
"crypto/tls"
|
"crypto/tls"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"runtime/trace"
|
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
apiClient "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/rpc/client"
|
apiClient "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/rpc/client"
|
||||||
|
@ -64,22 +63,16 @@ func (c *treeClient) redialIfNecessary(ctx context.Context) (healthHasChanged bo
|
||||||
defer c.mu.Unlock()
|
defer c.mu.Unlock()
|
||||||
|
|
||||||
if c.conn == nil {
|
if c.conn == nil {
|
||||||
reg := trace.StartRegion(ctx, "tree.updatenodehealth.dial")
|
|
||||||
if c.conn, c.service, err = dialClient(ctx, c.address, c.opts...); err != nil {
|
if c.conn, c.service, err = dialClient(ctx, c.address, c.opts...); err != nil {
|
||||||
reg.End()
|
|
||||||
return false, err
|
return false, err
|
||||||
}
|
}
|
||||||
reg.End()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
wasHealthy := c.healthy
|
wasHealthy := c.healthy
|
||||||
reg := trace.StartRegion(ctx, "tree.updatenodehealth.healthcheck")
|
|
||||||
if _, err = c.service.Healthcheck(ctx, &grpcService.HealthcheckRequest{}); err != nil {
|
if _, err = c.service.Healthcheck(ctx, &grpcService.HealthcheckRequest{}); err != nil {
|
||||||
c.healthy = false
|
c.healthy = false
|
||||||
reg.End()
|
|
||||||
return wasHealthy, fmt.Errorf("healthcheck tree service: %w", err)
|
return wasHealthy, fmt.Errorf("healthcheck tree service: %w", err)
|
||||||
}
|
}
|
||||||
reg.End()
|
|
||||||
|
|
||||||
c.healthy = true
|
c.healthy = true
|
||||||
|
|
||||||
|
|
|
@ -5,7 +5,6 @@ import (
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
"runtime/trace"
|
|
||||||
"sort"
|
"sort"
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
|
@ -61,7 +60,6 @@ type InitParameters struct {
|
||||||
clientRebalanceInterval time.Duration
|
clientRebalanceInterval time.Duration
|
||||||
nodeParams []pool.NodeParam
|
nodeParams []pool.NodeParam
|
||||||
dialOptions []grpc.DialOption
|
dialOptions []grpc.DialOption
|
||||||
maxRequestAttempts int
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Pool represents virtual connection to the FrostFS tree services network to communicate
|
// Pool represents virtual connection to the FrostFS tree services network to communicate
|
||||||
|
@ -80,8 +78,6 @@ type Pool struct {
|
||||||
dialOptions []grpc.DialOption
|
dialOptions []grpc.DialOption
|
||||||
logger *zap.Logger
|
logger *zap.Logger
|
||||||
|
|
||||||
maxRequestAttempts int
|
|
||||||
|
|
||||||
startIndicesMtx sync.RWMutex
|
startIndicesMtx sync.RWMutex
|
||||||
// startIndices points to the client from which the next request will be executed.
|
// startIndices points to the client from which the next request will be executed.
|
||||||
// Since clients are stored in innerPool field we have to use two indices.
|
// Since clients are stored in innerPool field we have to use two indices.
|
||||||
|
@ -181,7 +177,6 @@ func NewPool(options InitParameters) (*Pool, error) {
|
||||||
nodeRequestTimeout: options.healthcheckTimeout,
|
nodeRequestTimeout: options.healthcheckTimeout,
|
||||||
clientRebalanceInterval: options.clientRebalanceInterval,
|
clientRebalanceInterval: options.clientRebalanceInterval,
|
||||||
},
|
},
|
||||||
maxRequestAttempts: options.maxRequestAttempts,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return p, nil
|
return p, nil
|
||||||
|
@ -273,21 +268,12 @@ func (x *InitParameters) SetGRPCDialOptions(opts ...grpc.DialOption) {
|
||||||
x.dialOptions = opts
|
x.dialOptions = opts
|
||||||
}
|
}
|
||||||
|
|
||||||
// SetMaxRequestAttempts sets the max attempt to make successful request.
|
|
||||||
// Default value is 0 that means the number of attempts equals to number of nodes in pool.
|
|
||||||
func (x *InitParameters) SetMaxRequestAttempts(maxAttempts int) {
|
|
||||||
x.maxRequestAttempts = maxAttempts
|
|
||||||
}
|
|
||||||
|
|
||||||
// GetNodes invokes eponymous method from TreeServiceClient.
|
// GetNodes invokes eponymous method from TreeServiceClient.
|
||||||
//
|
//
|
||||||
// Can return predefined errors:
|
// Can return predefined errors:
|
||||||
// * ErrNodeNotFound
|
// * ErrNodeNotFound
|
||||||
// * ErrNodeAccessDenied.
|
// * ErrNodeAccessDenied.
|
||||||
func (p *Pool) GetNodes(ctx context.Context, prm GetNodesParams) ([]*grpcService.GetNodeByPathResponse_Info, error) {
|
func (p *Pool) GetNodes(ctx context.Context, prm GetNodesParams) ([]*grpcService.GetNodeByPathResponse_Info, error) {
|
||||||
ctx, task := trace.NewTask(ctx, "tree.getnodes")
|
|
||||||
defer task.End()
|
|
||||||
|
|
||||||
request := &grpcService.GetNodeByPathRequest{
|
request := &grpcService.GetNodeByPathRequest{
|
||||||
Body: &grpcService.GetNodeByPathRequest_Body{
|
Body: &grpcService.GetNodeByPathRequest_Body{
|
||||||
ContainerId: prm.CID[:],
|
ContainerId: prm.CID[:],
|
||||||
|
@ -311,10 +297,7 @@ func (p *Pool) GetNodes(ctx context.Context, prm GetNodesParams) ([]*grpcService
|
||||||
}
|
}
|
||||||
|
|
||||||
var resp *grpcService.GetNodeByPathResponse
|
var resp *grpcService.GetNodeByPathResponse
|
||||||
if err := p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) (inErr error) {
|
if err := p.requestWithRetry(func(client grpcService.TreeServiceClient) (inErr error) {
|
||||||
reg := trace.StartRegion(ctx, "tree.getnodes.single")
|
|
||||||
defer reg.End()
|
|
||||||
|
|
||||||
resp, inErr = client.GetNodeByPath(ctx, request)
|
resp, inErr = client.GetNodeByPath(ctx, request)
|
||||||
// Pool wants to do retry 'GetNodeByPath' request if result is empty.
|
// Pool wants to do retry 'GetNodeByPath' request if result is empty.
|
||||||
// Empty result is expected due to delayed tree service sync.
|
// Empty result is expected due to delayed tree service sync.
|
||||||
|
@ -411,7 +394,7 @@ func (p *Pool) GetSubTree(ctx context.Context, prm GetSubTreeParams) (*SubTreeRe
|
||||||
}
|
}
|
||||||
|
|
||||||
var cli grpcService.TreeService_GetSubTreeClient
|
var cli grpcService.TreeService_GetSubTreeClient
|
||||||
if err := p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) (inErr error) {
|
if err := p.requestWithRetry(func(client grpcService.TreeServiceClient) (inErr error) {
|
||||||
cli, inErr = client.GetSubTree(ctx, request)
|
cli, inErr = client.GetSubTree(ctx, request)
|
||||||
return handleError("failed to get sub tree client", inErr)
|
return handleError("failed to get sub tree client", inErr)
|
||||||
}); err != nil {
|
}); err != nil {
|
||||||
|
@ -427,9 +410,6 @@ func (p *Pool) GetSubTree(ctx context.Context, prm GetSubTreeParams) (*SubTreeRe
|
||||||
// * ErrNodeNotFound
|
// * ErrNodeNotFound
|
||||||
// * ErrNodeAccessDenied.
|
// * ErrNodeAccessDenied.
|
||||||
func (p *Pool) AddNode(ctx context.Context, prm AddNodeParams) (uint64, error) {
|
func (p *Pool) AddNode(ctx context.Context, prm AddNodeParams) (uint64, error) {
|
||||||
ctx, task := trace.NewTask(ctx, "tree.addnode")
|
|
||||||
defer task.End()
|
|
||||||
|
|
||||||
request := &grpcService.AddRequest{
|
request := &grpcService.AddRequest{
|
||||||
Body: &grpcService.AddRequest_Body{
|
Body: &grpcService.AddRequest_Body{
|
||||||
ContainerId: prm.CID[:],
|
ContainerId: prm.CID[:],
|
||||||
|
@ -449,10 +429,7 @@ func (p *Pool) AddNode(ctx context.Context, prm AddNodeParams) (uint64, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
var resp *grpcService.AddResponse
|
var resp *grpcService.AddResponse
|
||||||
if err := p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) (inErr error) {
|
if err := p.requestWithRetry(func(client grpcService.TreeServiceClient) (inErr error) {
|
||||||
reg := trace.StartRegion(ctx, "tree.addnode.single")
|
|
||||||
defer reg.End()
|
|
||||||
|
|
||||||
resp, inErr = client.Add(ctx, request)
|
resp, inErr = client.Add(ctx, request)
|
||||||
return handleError("failed to add node", inErr)
|
return handleError("failed to add node", inErr)
|
||||||
}); err != nil {
|
}); err != nil {
|
||||||
|
@ -468,9 +445,6 @@ func (p *Pool) AddNode(ctx context.Context, prm AddNodeParams) (uint64, error) {
|
||||||
// * ErrNodeNotFound
|
// * ErrNodeNotFound
|
||||||
// * ErrNodeAccessDenied.
|
// * ErrNodeAccessDenied.
|
||||||
func (p *Pool) AddNodeByPath(ctx context.Context, prm AddNodeByPathParams) (uint64, error) {
|
func (p *Pool) AddNodeByPath(ctx context.Context, prm AddNodeByPathParams) (uint64, error) {
|
||||||
ctx, task := trace.NewTask(ctx, "tree.addnodebypath")
|
|
||||||
defer task.End()
|
|
||||||
|
|
||||||
request := &grpcService.AddByPathRequest{
|
request := &grpcService.AddByPathRequest{
|
||||||
Body: &grpcService.AddByPathRequest_Body{
|
Body: &grpcService.AddByPathRequest_Body{
|
||||||
ContainerId: prm.CID[:],
|
ContainerId: prm.CID[:],
|
||||||
|
@ -492,10 +466,7 @@ func (p *Pool) AddNodeByPath(ctx context.Context, prm AddNodeByPathParams) (uint
|
||||||
}
|
}
|
||||||
|
|
||||||
var resp *grpcService.AddByPathResponse
|
var resp *grpcService.AddByPathResponse
|
||||||
if err := p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) (inErr error) {
|
if err := p.requestWithRetry(func(client grpcService.TreeServiceClient) (inErr error) {
|
||||||
reg := trace.StartRegion(ctx, "tree.addnodebypath.single")
|
|
||||||
defer reg.End()
|
|
||||||
|
|
||||||
resp, inErr = client.AddByPath(ctx, request)
|
resp, inErr = client.AddByPath(ctx, request)
|
||||||
return handleError("failed to add node by path", inErr)
|
return handleError("failed to add node by path", inErr)
|
||||||
}); err != nil {
|
}); err != nil {
|
||||||
|
@ -519,9 +490,6 @@ func (p *Pool) AddNodeByPath(ctx context.Context, prm AddNodeByPathParams) (uint
|
||||||
// * ErrNodeNotFound
|
// * ErrNodeNotFound
|
||||||
// * ErrNodeAccessDenied.
|
// * ErrNodeAccessDenied.
|
||||||
func (p *Pool) MoveNode(ctx context.Context, prm MoveNodeParams) error {
|
func (p *Pool) MoveNode(ctx context.Context, prm MoveNodeParams) error {
|
||||||
ctx, task := trace.NewTask(ctx, "tree.movenode")
|
|
||||||
defer task.End()
|
|
||||||
|
|
||||||
request := &grpcService.MoveRequest{
|
request := &grpcService.MoveRequest{
|
||||||
Body: &grpcService.MoveRequest_Body{
|
Body: &grpcService.MoveRequest_Body{
|
||||||
ContainerId: prm.CID[:],
|
ContainerId: prm.CID[:],
|
||||||
|
@ -542,10 +510,7 @@ func (p *Pool) MoveNode(ctx context.Context, prm MoveNodeParams) error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
return p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) error {
|
return p.requestWithRetry(func(client grpcService.TreeServiceClient) error {
|
||||||
reg := trace.StartRegion(ctx, "tree.movenode.single")
|
|
||||||
defer reg.End()
|
|
||||||
|
|
||||||
if _, err := client.Move(ctx, request); err != nil {
|
if _, err := client.Move(ctx, request); err != nil {
|
||||||
return handleError("failed to move node", err)
|
return handleError("failed to move node", err)
|
||||||
}
|
}
|
||||||
|
@ -559,9 +524,6 @@ func (p *Pool) MoveNode(ctx context.Context, prm MoveNodeParams) error {
|
||||||
// * ErrNodeNotFound
|
// * ErrNodeNotFound
|
||||||
// * ErrNodeAccessDenied.
|
// * ErrNodeAccessDenied.
|
||||||
func (p *Pool) RemoveNode(ctx context.Context, prm RemoveNodeParams) error {
|
func (p *Pool) RemoveNode(ctx context.Context, prm RemoveNodeParams) error {
|
||||||
ctx, task := trace.NewTask(ctx, "tree.removenode")
|
|
||||||
defer task.End()
|
|
||||||
|
|
||||||
request := &grpcService.RemoveRequest{
|
request := &grpcService.RemoveRequest{
|
||||||
Body: &grpcService.RemoveRequest_Body{
|
Body: &grpcService.RemoveRequest_Body{
|
||||||
ContainerId: prm.CID[:],
|
ContainerId: prm.CID[:],
|
||||||
|
@ -579,10 +541,7 @@ func (p *Pool) RemoveNode(ctx context.Context, prm RemoveNodeParams) error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
return p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) error {
|
return p.requestWithRetry(func(client grpcService.TreeServiceClient) error {
|
||||||
reg := trace.StartRegion(ctx, "tree.removenode.single")
|
|
||||||
defer reg.End()
|
|
||||||
|
|
||||||
if _, err := client.Remove(ctx, request); err != nil {
|
if _, err := client.Remove(ctx, request); err != nil {
|
||||||
return handleError("failed to remove node", err)
|
return handleError("failed to remove node", err)
|
||||||
}
|
}
|
||||||
|
@ -669,10 +628,6 @@ func fillDefaultInitParams(params *InitParameters) {
|
||||||
if params.nodeStreamTimeout <= 0 {
|
if params.nodeStreamTimeout <= 0 {
|
||||||
params.nodeStreamTimeout = defaultStreamTimeout
|
params.nodeStreamTimeout = defaultStreamTimeout
|
||||||
}
|
}
|
||||||
|
|
||||||
if params.maxRequestAttempts <= 0 {
|
|
||||||
params.maxRequestAttempts = len(params.nodeParams)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *Pool) log(level zapcore.Level, msg string, fields ...zap.Field) {
|
func (p *Pool) log(level zapcore.Level, msg string, fields ...zap.Field) {
|
||||||
|
@ -704,9 +659,6 @@ func (p *Pool) startRebalance(ctx context.Context) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *Pool) updateNodesHealth(ctx context.Context, buffers [][]bool) {
|
func (p *Pool) updateNodesHealth(ctx context.Context, buffers [][]bool) {
|
||||||
ctx, task := trace.NewTask(ctx, "tree.updatenodehealth")
|
|
||||||
defer task.End()
|
|
||||||
|
|
||||||
wg := sync.WaitGroup{}
|
wg := sync.WaitGroup{}
|
||||||
for i, inner := range p.innerPools {
|
for i, inner := range p.innerPools {
|
||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
|
@ -776,31 +728,19 @@ func (p *Pool) setStartIndices(i, j int) {
|
||||||
p.startIndicesMtx.Unlock()
|
p.startIndicesMtx.Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *Pool) requestWithRetry(ctx context.Context, fn func(client grpcService.TreeServiceClient) error) error {
|
func (p *Pool) requestWithRetry(fn func(client grpcService.TreeServiceClient) error) error {
|
||||||
var (
|
var (
|
||||||
err, finErr error
|
err, finErr error
|
||||||
cl grpcService.TreeServiceClient
|
cl grpcService.TreeServiceClient
|
||||||
)
|
)
|
||||||
|
|
||||||
reqID := GetRequestID(ctx)
|
|
||||||
|
|
||||||
startI, startJ := p.getStartIndices()
|
startI, startJ := p.getStartIndices()
|
||||||
groupsLen := len(p.innerPools)
|
groupsLen := len(p.innerPools)
|
||||||
attempts := p.maxRequestAttempts
|
|
||||||
for i := startI; i < startI+groupsLen; i++ {
|
for i := startI; i < startI+groupsLen; i++ {
|
||||||
indexI := i % groupsLen
|
indexI := i % groupsLen
|
||||||
clientsLen := len(p.innerPools[indexI].clients)
|
clientsLen := len(p.innerPools[indexI].clients)
|
||||||
for j := startJ; j < startJ+clientsLen; j++ {
|
for j := startJ; j < startJ+clientsLen; j++ {
|
||||||
indexJ := j % clientsLen
|
indexJ := j % clientsLen
|
||||||
|
|
||||||
if attempts == 0 {
|
|
||||||
if startI != indexI || startJ != indexJ {
|
|
||||||
p.setStartIndices(indexI, indexJ)
|
|
||||||
}
|
|
||||||
break
|
|
||||||
}
|
|
||||||
attempts--
|
|
||||||
|
|
||||||
if cl, err = p.innerPools[indexI].clients[indexJ].serviceClient(); err == nil {
|
if cl, err = p.innerPools[indexI].clients[indexJ].serviceClient(); err == nil {
|
||||||
err = fn(cl)
|
err = fn(cl)
|
||||||
}
|
}
|
||||||
|
@ -810,10 +750,8 @@ func (p *Pool) requestWithRetry(ctx context.Context, fn func(client grpcService.
|
||||||
}
|
}
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
finErr = finalError(finErr, err)
|
finErr = finalError(finErr, err)
|
||||||
p.log(zap.DebugLevel, "tree request error", zap.String("request_id", reqID), zap.Int("remaining attempts", attempts),
|
p.log(zap.DebugLevel, "tree request error", zap.String("address", p.innerPools[indexI].clients[indexJ].endpoint()), zap.Error(err))
|
||||||
zap.String("address", p.innerPools[indexI].clients[indexJ].endpoint()), zap.Error(err))
|
|
||||||
}
|
}
|
||||||
startJ = 0
|
startJ = 0
|
||||||
}
|
}
|
||||||
|
@ -834,9 +772,6 @@ func prioErr(err error) int {
|
||||||
case errors.Is(err, ErrNodeNotFound) ||
|
case errors.Is(err, ErrNodeNotFound) ||
|
||||||
errors.Is(err, errNodeEmptyResult):
|
errors.Is(err, errNodeEmptyResult):
|
||||||
return 200
|
return 200
|
||||||
case errors.Is(err, context.Canceled) ||
|
|
||||||
errors.Is(err, context.DeadlineExceeded):
|
|
||||||
return 250
|
|
||||||
case errors.Is(err, ErrUnhealthyEndpoint):
|
case errors.Is(err, ErrUnhealthyEndpoint):
|
||||||
return 300
|
return 300
|
||||||
default:
|
default:
|
||||||
|
@ -856,16 +791,3 @@ func finalError(current, candidate error) error {
|
||||||
|
|
||||||
return current
|
return current
|
||||||
}
|
}
|
||||||
|
|
||||||
type reqKeyType string
|
|
||||||
|
|
||||||
const reqIDKey = reqKeyType("request_id")
|
|
||||||
|
|
||||||
func SetRequestID(ctx context.Context, reqID string) context.Context {
|
|
||||||
return context.WithValue(ctx, reqIDKey, reqID)
|
|
||||||
}
|
|
||||||
|
|
||||||
func GetRequestID(ctx context.Context) string {
|
|
||||||
reqID, _ := ctx.Value(reqIDKey).(string)
|
|
||||||
return reqID
|
|
||||||
}
|
|
||||||
|
|
|
@ -77,21 +77,14 @@ func TestHandleError(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestRetry(t *testing.T) {
|
func TestRetry(t *testing.T) {
|
||||||
ctx := context.Background()
|
|
||||||
nodes := [][]string{
|
nodes := [][]string{
|
||||||
{"node00", "node01", "node02", "node03"},
|
{"node00", "node01", "node02", "node03"},
|
||||||
{"node10", "node11", "node12", "node13"},
|
{"node10", "node11", "node12", "node13"},
|
||||||
}
|
}
|
||||||
|
|
||||||
var lenNodes int
|
|
||||||
for i := range nodes {
|
|
||||||
lenNodes += len(nodes[i])
|
|
||||||
}
|
|
||||||
|
|
||||||
p := &Pool{
|
p := &Pool{
|
||||||
logger: zaptest.NewLogger(t),
|
logger: zaptest.NewLogger(t),
|
||||||
innerPools: makeInnerPool(nodes),
|
innerPools: makeInnerPool(nodes),
|
||||||
maxRequestAttempts: lenNodes,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
makeFn := func(client grpcService.TreeServiceClient) error {
|
makeFn := func(client grpcService.TreeServiceClient) error {
|
||||||
|
@ -99,14 +92,14 @@ func TestRetry(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
t.Run("first ok", func(t *testing.T) {
|
t.Run("first ok", func(t *testing.T) {
|
||||||
err := p.requestWithRetry(ctx, makeFn)
|
err := p.requestWithRetry(makeFn)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
checkIndicesAndReset(t, p, 0, 0)
|
checkIndicesAndReset(t, p, 0, 0)
|
||||||
})
|
})
|
||||||
|
|
||||||
t.Run("first failed", func(t *testing.T) {
|
t.Run("first failed", func(t *testing.T) {
|
||||||
setErrors(p, "node00")
|
setErrors(p, "node00")
|
||||||
err := p.requestWithRetry(ctx, makeFn)
|
err := p.requestWithRetry(makeFn)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
checkIndicesAndReset(t, p, 0, 1)
|
checkIndicesAndReset(t, p, 0, 1)
|
||||||
})
|
})
|
||||||
|
@ -114,7 +107,7 @@ func TestRetry(t *testing.T) {
|
||||||
t.Run("all failed", func(t *testing.T) {
|
t.Run("all failed", func(t *testing.T) {
|
||||||
setErrors(p, nodes[0]...)
|
setErrors(p, nodes[0]...)
|
||||||
setErrors(p, nodes[1]...)
|
setErrors(p, nodes[1]...)
|
||||||
err := p.requestWithRetry(ctx, makeFn)
|
err := p.requestWithRetry(makeFn)
|
||||||
require.Error(t, err)
|
require.Error(t, err)
|
||||||
checkIndicesAndReset(t, p, 0, 0)
|
checkIndicesAndReset(t, p, 0, 0)
|
||||||
})
|
})
|
||||||
|
@ -122,13 +115,13 @@ func TestRetry(t *testing.T) {
|
||||||
t.Run("round", func(t *testing.T) {
|
t.Run("round", func(t *testing.T) {
|
||||||
setErrors(p, nodes[0][0], nodes[0][1])
|
setErrors(p, nodes[0][0], nodes[0][1])
|
||||||
setErrors(p, nodes[1]...)
|
setErrors(p, nodes[1]...)
|
||||||
err := p.requestWithRetry(ctx, makeFn)
|
err := p.requestWithRetry(makeFn)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
checkIndices(t, p, 0, 2)
|
checkIndices(t, p, 0, 2)
|
||||||
resetClientsErrors(p)
|
resetClientsErrors(p)
|
||||||
|
|
||||||
setErrors(p, nodes[0][2], nodes[0][3])
|
setErrors(p, nodes[0][2], nodes[0][3])
|
||||||
err = p.requestWithRetry(ctx, makeFn)
|
err = p.requestWithRetry(makeFn)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
checkIndicesAndReset(t, p, 0, 0)
|
checkIndicesAndReset(t, p, 0, 0)
|
||||||
})
|
})
|
||||||
|
@ -136,14 +129,14 @@ func TestRetry(t *testing.T) {
|
||||||
t.Run("group switch", func(t *testing.T) {
|
t.Run("group switch", func(t *testing.T) {
|
||||||
setErrors(p, nodes[0]...)
|
setErrors(p, nodes[0]...)
|
||||||
setErrors(p, nodes[1][0])
|
setErrors(p, nodes[1][0])
|
||||||
err := p.requestWithRetry(ctx, makeFn)
|
err := p.requestWithRetry(makeFn)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
checkIndicesAndReset(t, p, 1, 1)
|
checkIndicesAndReset(t, p, 1, 1)
|
||||||
})
|
})
|
||||||
|
|
||||||
t.Run("group round", func(t *testing.T) {
|
t.Run("group round", func(t *testing.T) {
|
||||||
setErrors(p, nodes[0][1:]...)
|
setErrors(p, nodes[0][1:]...)
|
||||||
err := p.requestWithRetry(ctx, makeFn)
|
err := p.requestWithRetry(makeFn)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
checkIndicesAndReset(t, p, 0, 0)
|
checkIndicesAndReset(t, p, 0, 0)
|
||||||
})
|
})
|
||||||
|
@ -151,7 +144,7 @@ func TestRetry(t *testing.T) {
|
||||||
t.Run("group round switch", func(t *testing.T) {
|
t.Run("group round switch", func(t *testing.T) {
|
||||||
setErrors(p, nodes[0]...)
|
setErrors(p, nodes[0]...)
|
||||||
p.setStartIndices(0, 1)
|
p.setStartIndices(0, 1)
|
||||||
err := p.requestWithRetry(ctx, makeFn)
|
err := p.requestWithRetry(makeFn)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
checkIndicesAndReset(t, p, 1, 0)
|
checkIndicesAndReset(t, p, 1, 0)
|
||||||
})
|
})
|
||||||
|
@ -159,14 +152,14 @@ func TestRetry(t *testing.T) {
|
||||||
t.Run("no panic group switch", func(t *testing.T) {
|
t.Run("no panic group switch", func(t *testing.T) {
|
||||||
setErrors(p, nodes[1]...)
|
setErrors(p, nodes[1]...)
|
||||||
p.setStartIndices(1, 0)
|
p.setStartIndices(1, 0)
|
||||||
err := p.requestWithRetry(ctx, makeFn)
|
err := p.requestWithRetry(makeFn)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
checkIndicesAndReset(t, p, 0, 0)
|
checkIndicesAndReset(t, p, 0, 0)
|
||||||
})
|
})
|
||||||
|
|
||||||
t.Run("error empty result", func(t *testing.T) {
|
t.Run("error empty result", func(t *testing.T) {
|
||||||
errNodes, index := 2, 0
|
errNodes, index := 2, 0
|
||||||
err := p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) error {
|
err := p.requestWithRetry(func(client grpcService.TreeServiceClient) error {
|
||||||
if index < errNodes {
|
if index < errNodes {
|
||||||
index++
|
index++
|
||||||
return errNodeEmptyResult
|
return errNodeEmptyResult
|
||||||
|
@ -179,7 +172,7 @@ func TestRetry(t *testing.T) {
|
||||||
|
|
||||||
t.Run("error not found", func(t *testing.T) {
|
t.Run("error not found", func(t *testing.T) {
|
||||||
errNodes, index := 2, 0
|
errNodes, index := 2, 0
|
||||||
err := p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) error {
|
err := p.requestWithRetry(func(client grpcService.TreeServiceClient) error {
|
||||||
if index < errNodes {
|
if index < errNodes {
|
||||||
index++
|
index++
|
||||||
return ErrNodeNotFound
|
return ErrNodeNotFound
|
||||||
|
@ -192,7 +185,7 @@ func TestRetry(t *testing.T) {
|
||||||
|
|
||||||
t.Run("error access denied", func(t *testing.T) {
|
t.Run("error access denied", func(t *testing.T) {
|
||||||
var index int
|
var index int
|
||||||
err := p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) error {
|
err := p.requestWithRetry(func(client grpcService.TreeServiceClient) error {
|
||||||
index++
|
index++
|
||||||
return ErrNodeAccessDenied
|
return ErrNodeAccessDenied
|
||||||
})
|
})
|
||||||
|
@ -200,18 +193,6 @@ func TestRetry(t *testing.T) {
|
||||||
require.Equal(t, 1, index)
|
require.Equal(t, 1, index)
|
||||||
checkIndicesAndReset(t, p, 0, 0)
|
checkIndicesAndReset(t, p, 0, 0)
|
||||||
})
|
})
|
||||||
|
|
||||||
t.Run("limit attempts", func(t *testing.T) {
|
|
||||||
oldVal := p.maxRequestAttempts
|
|
||||||
p.maxRequestAttempts = 2
|
|
||||||
setErrors(p, nodes[0]...)
|
|
||||||
setErrors(p, nodes[1]...)
|
|
||||||
err := p.requestWithRetry(ctx, makeFn)
|
|
||||||
require.Error(t, err)
|
|
||||||
checkIndicesAndReset(t, p, 0, 2)
|
|
||||||
p.maxRequestAttempts = oldVal
|
|
||||||
})
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestRebalance(t *testing.T) {
|
func TestRebalance(t *testing.T) {
|
||||||
|
|
Loading…
Add table
Reference in a new issue