forked from TrueCloudLab/frostfs-node
Compare commits
10 commits
5aef303259
...
990e984023
Author | SHA1 | Date | |
---|---|---|---|
990e984023 | |||
a83dd231ec | |||
51f0bf8be0 | |||
52c04ef475 | |||
8423e0f6f9 | |||
08593f664b | |||
eec359cfa8 | |||
43d6fbf73b | |||
01e18eda43 | |||
3e89e744aa |
33 changed files with 418 additions and 90 deletions
|
@ -8,7 +8,7 @@ jobs:
|
||||||
runs-on: ubuntu-latest
|
runs-on: ubuntu-latest
|
||||||
strategy:
|
strategy:
|
||||||
matrix:
|
matrix:
|
||||||
go_versions: [ '1.21', '1.22' ]
|
go_versions: [ '1.22', '1.23' ]
|
||||||
|
|
||||||
steps:
|
steps:
|
||||||
- uses: actions/checkout@v3
|
- uses: actions/checkout@v3
|
||||||
|
|
|
@ -16,7 +16,7 @@ jobs:
|
||||||
- name: Set up Go
|
- name: Set up Go
|
||||||
uses: actions/setup-go@v3
|
uses: actions/setup-go@v3
|
||||||
with:
|
with:
|
||||||
go-version: 1.22
|
go-version: 1.23
|
||||||
- name: Set up Python
|
- name: Set up Python
|
||||||
run: |
|
run: |
|
||||||
apt update
|
apt update
|
||||||
|
|
|
@ -11,7 +11,7 @@ jobs:
|
||||||
- name: Set up Go
|
- name: Set up Go
|
||||||
uses: actions/setup-go@v3
|
uses: actions/setup-go@v3
|
||||||
with:
|
with:
|
||||||
go-version: '1.22'
|
go-version: '1.23'
|
||||||
cache: true
|
cache: true
|
||||||
|
|
||||||
- name: Install linters
|
- name: Install linters
|
||||||
|
@ -25,7 +25,7 @@ jobs:
|
||||||
runs-on: ubuntu-latest
|
runs-on: ubuntu-latest
|
||||||
strategy:
|
strategy:
|
||||||
matrix:
|
matrix:
|
||||||
go_versions: [ '1.21', '1.22' ]
|
go_versions: [ '1.22', '1.23' ]
|
||||||
fail-fast: false
|
fail-fast: false
|
||||||
steps:
|
steps:
|
||||||
- uses: actions/checkout@v3
|
- uses: actions/checkout@v3
|
||||||
|
@ -48,7 +48,7 @@ jobs:
|
||||||
- name: Set up Go
|
- name: Set up Go
|
||||||
uses: actions/setup-go@v3
|
uses: actions/setup-go@v3
|
||||||
with:
|
with:
|
||||||
go-version: '1.21'
|
go-version: '1.22'
|
||||||
cache: true
|
cache: true
|
||||||
|
|
||||||
- name: Run tests
|
- name: Run tests
|
||||||
|
@ -63,7 +63,7 @@ jobs:
|
||||||
- name: Set up Go
|
- name: Set up Go
|
||||||
uses: actions/setup-go@v3
|
uses: actions/setup-go@v3
|
||||||
with:
|
with:
|
||||||
go-version: '1.22'
|
go-version: '1.23'
|
||||||
cache: true
|
cache: true
|
||||||
|
|
||||||
- name: Install staticcheck
|
- name: Install staticcheck
|
||||||
|
@ -81,7 +81,7 @@ jobs:
|
||||||
- name: Set up Go
|
- name: Set up Go
|
||||||
uses: actions/setup-go@v3
|
uses: actions/setup-go@v3
|
||||||
with:
|
with:
|
||||||
go-version: '1.21'
|
go-version: '1.22'
|
||||||
cache: true
|
cache: true
|
||||||
|
|
||||||
- name: Install gopls
|
- name: Install gopls
|
||||||
|
|
|
@ -13,7 +13,7 @@ jobs:
|
||||||
- name: Setup Go
|
- name: Setup Go
|
||||||
uses: actions/setup-go@v3
|
uses: actions/setup-go@v3
|
||||||
with:
|
with:
|
||||||
go-version: '1.22'
|
go-version: '1.23'
|
||||||
|
|
||||||
- name: Install govulncheck
|
- name: Install govulncheck
|
||||||
run: go install golang.org/x/vuln/cmd/govulncheck@latest
|
run: go install golang.org/x/vuln/cmd/govulncheck@latest
|
||||||
|
|
|
@ -12,7 +12,8 @@ run:
|
||||||
# output configuration options
|
# output configuration options
|
||||||
output:
|
output:
|
||||||
# colored-line-number|line-number|json|tab|checkstyle|code-climate, default is "colored-line-number"
|
# colored-line-number|line-number|json|tab|checkstyle|code-climate, default is "colored-line-number"
|
||||||
format: tab
|
formats:
|
||||||
|
- format: tab
|
||||||
|
|
||||||
# all available settings of specific linters
|
# all available settings of specific linters
|
||||||
linters-settings:
|
linters-settings:
|
||||||
|
|
10
Makefile
10
Makefile
|
@ -8,8 +8,8 @@ HUB_IMAGE ?= truecloudlab/frostfs
|
||||||
HUB_TAG ?= "$(shell echo ${VERSION} | sed 's/^v//')"
|
HUB_TAG ?= "$(shell echo ${VERSION} | sed 's/^v//')"
|
||||||
|
|
||||||
GO_VERSION ?= 1.22
|
GO_VERSION ?= 1.22
|
||||||
LINT_VERSION ?= 1.56.1
|
LINT_VERSION ?= 1.60.1
|
||||||
TRUECLOUDLAB_LINT_VERSION ?= 0.0.5
|
TRUECLOUDLAB_LINT_VERSION ?= 0.0.7
|
||||||
PROTOC_VERSION ?= 25.0
|
PROTOC_VERSION ?= 25.0
|
||||||
PROTOC_GEN_GO_VERSION ?= $(shell go list -f '{{.Version}}' -m google.golang.org/protobuf)
|
PROTOC_GEN_GO_VERSION ?= $(shell go list -f '{{.Version}}' -m google.golang.org/protobuf)
|
||||||
PROTOGEN_FROSTFS_VERSION ?= $(shell go list -f '{{.Version}}' -m git.frostfs.info/TrueCloudLab/frostfs-api-go/v2)
|
PROTOGEN_FROSTFS_VERSION ?= $(shell go list -f '{{.Version}}' -m git.frostfs.info/TrueCloudLab/frostfs-api-go/v2)
|
||||||
|
@ -17,7 +17,7 @@ PROTOC_OS_VERSION=osx-x86_64
|
||||||
ifeq ($(shell uname), Linux)
|
ifeq ($(shell uname), Linux)
|
||||||
PROTOC_OS_VERSION=linux-x86_64
|
PROTOC_OS_VERSION=linux-x86_64
|
||||||
endif
|
endif
|
||||||
STATICCHECK_VERSION ?= 2023.1.6
|
STATICCHECK_VERSION ?= 2024.1.1
|
||||||
ARCH = amd64
|
ARCH = amd64
|
||||||
|
|
||||||
BIN = bin
|
BIN = bin
|
||||||
|
@ -190,7 +190,7 @@ lint-install:
|
||||||
@@make -C $(TMP_DIR)/linters lib CGO_ENABLED=1 OUT_DIR=$(OUTPUT_LINT_DIR)
|
@@make -C $(TMP_DIR)/linters lib CGO_ENABLED=1 OUT_DIR=$(OUTPUT_LINT_DIR)
|
||||||
@rm -rf $(TMP_DIR)/linters
|
@rm -rf $(TMP_DIR)/linters
|
||||||
@rmdir $(TMP_DIR) 2>/dev/null || true
|
@rmdir $(TMP_DIR) 2>/dev/null || true
|
||||||
@CGO_ENABLED=1 GOBIN=$(LINT_DIR) go install github.com/golangci/golangci-lint/cmd/golangci-lint@v$(LINT_VERSION)
|
@CGO_ENABLED=1 GOBIN=$(LINT_DIR) go install -trimpath github.com/golangci/golangci-lint/cmd/golangci-lint@v$(LINT_VERSION)
|
||||||
|
|
||||||
# Run linters
|
# Run linters
|
||||||
lint:
|
lint:
|
||||||
|
@ -210,7 +210,7 @@ staticcheck-run:
|
||||||
@if [ ! -d "$(STATICCHECK_VERSION_DIR)" ]; then \
|
@if [ ! -d "$(STATICCHECK_VERSION_DIR)" ]; then \
|
||||||
make staticcheck-install; \
|
make staticcheck-install; \
|
||||||
fi
|
fi
|
||||||
@$(STATICCHECK_VERSION_DIR)/staticcheck ./...
|
@$(STATICCHECK_VERSION_DIR)/staticcheck -checks inherit,-SA1019 ./...
|
||||||
|
|
||||||
# Install gopls
|
# Install gopls
|
||||||
gopls-install:
|
gopls-install:
|
||||||
|
|
|
@ -49,7 +49,7 @@ The latest version of frostfs-node works with frostfs-contract
|
||||||
|
|
||||||
# Building
|
# Building
|
||||||
|
|
||||||
To make all binaries you need Go 1.21+ and `make`:
|
To make all binaries you need Go 1.22+ and `make`:
|
||||||
```
|
```
|
||||||
make all
|
make all
|
||||||
```
|
```
|
||||||
|
|
|
@ -1070,7 +1070,7 @@ func initLocalStorage(ctx context.Context, c *cfg) {
|
||||||
c.onShutdown(func() {
|
c.onShutdown(func() {
|
||||||
c.log.Info(logs.FrostFSNodeClosingComponentsOfTheStorageEngine)
|
c.log.Info(logs.FrostFSNodeClosingComponentsOfTheStorageEngine)
|
||||||
|
|
||||||
err := ls.Close(context.Background())
|
err := ls.Close(context.WithoutCancel(ctx))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
c.log.Info(logs.FrostFSNodeStorageEngineClosingFailure,
|
c.log.Info(logs.FrostFSNodeStorageEngineClosingFailure,
|
||||||
zap.String("error", err.Error()),
|
zap.String("error", err.Error()),
|
||||||
|
|
|
@ -21,7 +21,7 @@ func initTracing(ctx context.Context, c *cfg) {
|
||||||
c.closers = append(c.closers, closer{
|
c.closers = append(c.closers, closer{
|
||||||
name: "tracing",
|
name: "tracing",
|
||||||
fn: func() {
|
fn: func() {
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
|
ctx, cancel := context.WithTimeout(context.WithoutCancel(ctx), time.Second*5)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
err := tracing.Shutdown(ctx) // cfg context cancels before close
|
err := tracing.Shutdown(ctx) // cfg context cancels before close
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
34
go.mod
34
go.mod
|
@ -1,14 +1,14 @@
|
||||||
module git.frostfs.info/TrueCloudLab/frostfs-node
|
module git.frostfs.info/TrueCloudLab/frostfs-node
|
||||||
|
|
||||||
go 1.21
|
go 1.22
|
||||||
|
|
||||||
require (
|
require (
|
||||||
code.gitea.io/sdk/gitea v0.17.1
|
code.gitea.io/sdk/gitea v0.17.1
|
||||||
git.frostfs.info/TrueCloudLab/frostfs-api-go/v2 v2.16.1-0.20240726072425-3dfa2f4fd65e
|
git.frostfs.info/TrueCloudLab/frostfs-api-go/v2 v2.16.1-0.20240906121927-2c79f770e449
|
||||||
git.frostfs.info/TrueCloudLab/frostfs-contract v0.19.3-0.20240621131249-49e5270f673e
|
git.frostfs.info/TrueCloudLab/frostfs-contract v0.19.3-0.20240621131249-49e5270f673e
|
||||||
git.frostfs.info/TrueCloudLab/frostfs-crypto v0.6.0
|
git.frostfs.info/TrueCloudLab/frostfs-crypto v0.6.0
|
||||||
git.frostfs.info/TrueCloudLab/frostfs-observability v0.0.0-20231101111734-b3ad3335ff65
|
git.frostfs.info/TrueCloudLab/frostfs-observability v0.0.0-20231101111734-b3ad3335ff65
|
||||||
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20240726111349-9da46f566fec
|
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20240906135451-a7aabe53491c
|
||||||
git.frostfs.info/TrueCloudLab/hrw v1.2.1
|
git.frostfs.info/TrueCloudLab/hrw v1.2.1
|
||||||
git.frostfs.info/TrueCloudLab/policy-engine v0.0.0-20240712081403-2628f6184984
|
git.frostfs.info/TrueCloudLab/policy-engine v0.0.0-20240712081403-2628f6184984
|
||||||
git.frostfs.info/TrueCloudLab/tzhash v1.8.0
|
git.frostfs.info/TrueCloudLab/tzhash v1.8.0
|
||||||
|
@ -23,7 +23,7 @@ require (
|
||||||
github.com/mitchellh/go-homedir v1.1.0
|
github.com/mitchellh/go-homedir v1.1.0
|
||||||
github.com/mr-tron/base58 v1.2.0
|
github.com/mr-tron/base58 v1.2.0
|
||||||
github.com/multiformats/go-multiaddr v0.12.1
|
github.com/multiformats/go-multiaddr v0.12.1
|
||||||
github.com/nspcc-dev/neo-go v0.106.2
|
github.com/nspcc-dev/neo-go v0.106.3
|
||||||
github.com/olekukonko/tablewriter v0.0.5
|
github.com/olekukonko/tablewriter v0.0.5
|
||||||
github.com/panjf2000/ants/v2 v2.9.0
|
github.com/panjf2000/ants/v2 v2.9.0
|
||||||
github.com/paulmach/orb v0.11.0
|
github.com/paulmach/orb v0.11.0
|
||||||
|
@ -39,11 +39,11 @@ require (
|
||||||
go.opentelemetry.io/otel/trace v1.22.0
|
go.opentelemetry.io/otel/trace v1.22.0
|
||||||
go.uber.org/zap v1.27.0
|
go.uber.org/zap v1.27.0
|
||||||
golang.org/x/exp v0.0.0-20240222234643-814bf88cf225
|
golang.org/x/exp v0.0.0-20240222234643-814bf88cf225
|
||||||
golang.org/x/sync v0.6.0
|
golang.org/x/sync v0.8.0
|
||||||
golang.org/x/sys v0.18.0
|
golang.org/x/sys v0.25.0
|
||||||
golang.org/x/term v0.18.0
|
golang.org/x/term v0.24.0
|
||||||
google.golang.org/grpc v1.63.2
|
google.golang.org/grpc v1.66.0
|
||||||
google.golang.org/protobuf v1.33.0
|
google.golang.org/protobuf v1.34.2
|
||||||
gopkg.in/yaml.v3 v3.0.1
|
gopkg.in/yaml.v3 v3.0.1
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -60,10 +60,10 @@ require (
|
||||||
github.com/beorn7/perks v1.0.1 // indirect
|
github.com/beorn7/perks v1.0.1 // indirect
|
||||||
github.com/bits-and-blooms/bitset v1.13.0 // indirect
|
github.com/bits-and-blooms/bitset v1.13.0 // indirect
|
||||||
github.com/cenkalti/backoff/v4 v4.2.1 // indirect
|
github.com/cenkalti/backoff/v4 v4.2.1 // indirect
|
||||||
github.com/cespare/xxhash/v2 v2.2.0 // indirect
|
github.com/cespare/xxhash/v2 v2.3.0 // indirect
|
||||||
github.com/consensys/bavard v0.1.13 // indirect
|
github.com/consensys/bavard v0.1.13 // indirect
|
||||||
github.com/consensys/gnark-crypto v0.12.2-0.20231222162921-eb75782795d2 // indirect
|
github.com/consensys/gnark-crypto v0.12.2-0.20231222162921-eb75782795d2 // indirect
|
||||||
github.com/cpuguy83/go-md2man/v2 v2.0.3 // indirect
|
github.com/cpuguy83/go-md2man/v2 v2.0.4 // indirect
|
||||||
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
|
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
|
||||||
github.com/davidmz/go-pageant v1.0.2 // indirect
|
github.com/davidmz/go-pageant v1.0.2 // indirect
|
||||||
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.2.0 // indirect
|
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.2.0 // indirect
|
||||||
|
@ -96,7 +96,7 @@ require (
|
||||||
github.com/multiformats/go-multihash v0.2.3 // indirect
|
github.com/multiformats/go-multihash v0.2.3 // indirect
|
||||||
github.com/multiformats/go-varint v0.0.7 // indirect
|
github.com/multiformats/go-varint v0.0.7 // indirect
|
||||||
github.com/nspcc-dev/go-ordered-json v0.0.0-20240301084351-0246b013f8b2 // indirect
|
github.com/nspcc-dev/go-ordered-json v0.0.0-20240301084351-0246b013f8b2 // indirect
|
||||||
github.com/nspcc-dev/neo-go/pkg/interop v0.0.0-20240521091047-78685785716d // indirect
|
github.com/nspcc-dev/neo-go/pkg/interop v0.0.0-20240727093519-1a48f1ce43ec // indirect
|
||||||
github.com/nspcc-dev/rfc6979 v0.2.1 // indirect
|
github.com/nspcc-dev/rfc6979 v0.2.1 // indirect
|
||||||
github.com/pelletier/go-toml/v2 v2.1.1 // indirect
|
github.com/pelletier/go-toml/v2 v2.1.1 // indirect
|
||||||
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
|
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
|
||||||
|
@ -118,11 +118,11 @@ require (
|
||||||
go.opentelemetry.io/otel/sdk v1.22.0 // indirect
|
go.opentelemetry.io/otel/sdk v1.22.0 // indirect
|
||||||
go.opentelemetry.io/proto/otlp v1.1.0 // indirect
|
go.opentelemetry.io/proto/otlp v1.1.0 // indirect
|
||||||
go.uber.org/multierr v1.11.0 // indirect
|
go.uber.org/multierr v1.11.0 // indirect
|
||||||
golang.org/x/crypto v0.21.0 // indirect
|
golang.org/x/crypto v0.27.0 // indirect
|
||||||
golang.org/x/net v0.23.0 // indirect
|
golang.org/x/net v0.29.0 // indirect
|
||||||
golang.org/x/text v0.14.0 // indirect
|
golang.org/x/text v0.18.0 // indirect
|
||||||
google.golang.org/genproto/googleapis/api v0.0.0-20240227224415-6ceb2ff114de // indirect
|
google.golang.org/genproto/googleapis/api v0.0.0-20240604185151-ef581f913117 // indirect
|
||||||
google.golang.org/genproto/googleapis/rpc v0.0.0-20240227224415-6ceb2ff114de // indirect
|
google.golang.org/genproto/googleapis/rpc v0.0.0-20240903143218-8af14fe29dc1 // indirect
|
||||||
gopkg.in/ini.v1 v1.67.0 // indirect
|
gopkg.in/ini.v1 v1.67.0 // indirect
|
||||||
lukechampine.com/blake3 v1.2.1 // indirect
|
lukechampine.com/blake3 v1.2.1 // indirect
|
||||||
rsc.io/tmplfunc v0.0.3 // indirect
|
rsc.io/tmplfunc v0.0.3 // indirect
|
||||||
|
|
BIN
go.sum
BIN
go.sum
Binary file not shown.
|
@ -27,21 +27,21 @@ type objectDesc struct {
|
||||||
storageID []byte
|
storageID []byte
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestAll(t *testing.T, cons Constructor, min, max uint64) {
|
func TestAll(t *testing.T, cons Constructor, minSize, maxSize uint64) {
|
||||||
t.Run("get", func(t *testing.T) {
|
t.Run("get", func(t *testing.T) {
|
||||||
TestGet(t, cons, min, max)
|
TestGet(t, cons, minSize, maxSize)
|
||||||
})
|
})
|
||||||
t.Run("get range", func(t *testing.T) {
|
t.Run("get range", func(t *testing.T) {
|
||||||
TestGetRange(t, cons, min, max)
|
TestGetRange(t, cons, minSize, maxSize)
|
||||||
})
|
})
|
||||||
t.Run("delete", func(t *testing.T) {
|
t.Run("delete", func(t *testing.T) {
|
||||||
TestDelete(t, cons, min, max)
|
TestDelete(t, cons, minSize, maxSize)
|
||||||
})
|
})
|
||||||
t.Run("exists", func(t *testing.T) {
|
t.Run("exists", func(t *testing.T) {
|
||||||
TestExists(t, cons, min, max)
|
TestExists(t, cons, minSize, maxSize)
|
||||||
})
|
})
|
||||||
t.Run("iterate", func(t *testing.T) {
|
t.Run("iterate", func(t *testing.T) {
|
||||||
TestIterate(t, cons, min, max)
|
TestIterate(t, cons, minSize, maxSize)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -51,12 +51,12 @@ func TestInfo(t *testing.T, cons Constructor, expectedType string, expectedPath
|
||||||
require.Equal(t, expectedPath, s.Path())
|
require.Equal(t, expectedPath, s.Path())
|
||||||
}
|
}
|
||||||
|
|
||||||
func prepare(t *testing.T, count int, s common.Storage, min, max uint64) []objectDesc {
|
func prepare(t *testing.T, count int, s common.Storage, minSize, maxSize uint64) []objectDesc {
|
||||||
objects := make([]objectDesc, count)
|
objects := make([]objectDesc, count)
|
||||||
|
|
||||||
r := mrand.New(mrand.NewSource(0))
|
r := mrand.New(mrand.NewSource(0))
|
||||||
for i := range objects {
|
for i := range objects {
|
||||||
objects[i].obj = NewObject(min + uint64(r.Intn(int(max-min+1)))) // not too large
|
objects[i].obj = NewObject(minSize + uint64(r.Intn(int(maxSize-minSize+1)))) // not too large
|
||||||
objects[i].addr = objectCore.AddressOf(objects[i].obj)
|
objects[i].addr = objectCore.AddressOf(objects[i].obj)
|
||||||
|
|
||||||
raw, err := objects[i].obj.Marshal()
|
raw, err := objects[i].obj.Marshal()
|
||||||
|
|
|
@ -13,12 +13,12 @@ import (
|
||||||
|
|
||||||
// TestControl checks correctness of a read-only mode.
|
// TestControl checks correctness of a read-only mode.
|
||||||
// cons must return a storage which is NOT opened.
|
// cons must return a storage which is NOT opened.
|
||||||
func TestControl(t *testing.T, cons Constructor, min, max uint64) {
|
func TestControl(t *testing.T, cons Constructor, minSize, maxSize uint64) {
|
||||||
s := cons(t)
|
s := cons(t)
|
||||||
require.NoError(t, s.Open(mode.ComponentReadWrite))
|
require.NoError(t, s.Open(mode.ComponentReadWrite))
|
||||||
require.NoError(t, s.Init())
|
require.NoError(t, s.Init())
|
||||||
|
|
||||||
objects := prepare(t, 10, s, min, max)
|
objects := prepare(t, 10, s, minSize, maxSize)
|
||||||
require.NoError(t, s.Close())
|
require.NoError(t, s.Close())
|
||||||
|
|
||||||
require.NoError(t, s.Open(mode.ComponentReadOnly))
|
require.NoError(t, s.Open(mode.ComponentReadOnly))
|
||||||
|
@ -34,7 +34,7 @@ func TestControl(t *testing.T, cons Constructor, min, max uint64) {
|
||||||
|
|
||||||
t.Run("put fails", func(t *testing.T) {
|
t.Run("put fails", func(t *testing.T) {
|
||||||
var prm common.PutPrm
|
var prm common.PutPrm
|
||||||
prm.Object = NewObject(min + uint64(rand.Intn(int(max-min+1))))
|
prm.Object = NewObject(minSize + uint64(rand.Intn(int(maxSize-minSize+1))))
|
||||||
prm.Address = objectCore.AddressOf(prm.Object)
|
prm.Address = objectCore.AddressOf(prm.Object)
|
||||||
|
|
||||||
_, err := s.Put(context.Background(), prm)
|
_, err := s.Put(context.Background(), prm)
|
||||||
|
|
|
@ -11,13 +11,13 @@ import (
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestDelete(t *testing.T, cons Constructor, min, max uint64) {
|
func TestDelete(t *testing.T, cons Constructor, minSize, maxSize uint64) {
|
||||||
s := cons(t)
|
s := cons(t)
|
||||||
require.NoError(t, s.Open(mode.ComponentReadWrite))
|
require.NoError(t, s.Open(mode.ComponentReadWrite))
|
||||||
require.NoError(t, s.Init())
|
require.NoError(t, s.Init())
|
||||||
defer func() { require.NoError(t, s.Close()) }()
|
defer func() { require.NoError(t, s.Close()) }()
|
||||||
|
|
||||||
objects := prepare(t, 4, s, min, max)
|
objects := prepare(t, 4, s, minSize, maxSize)
|
||||||
|
|
||||||
t.Run("delete non-existent", func(t *testing.T) {
|
t.Run("delete non-existent", func(t *testing.T) {
|
||||||
var prm common.DeletePrm
|
var prm common.DeletePrm
|
||||||
|
|
|
@ -10,13 +10,13 @@ import (
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestExists(t *testing.T, cons Constructor, min, max uint64) {
|
func TestExists(t *testing.T, cons Constructor, minSize, maxSize uint64) {
|
||||||
s := cons(t)
|
s := cons(t)
|
||||||
require.NoError(t, s.Open(mode.ComponentReadWrite))
|
require.NoError(t, s.Open(mode.ComponentReadWrite))
|
||||||
require.NoError(t, s.Init())
|
require.NoError(t, s.Init())
|
||||||
defer func() { require.NoError(t, s.Close()) }()
|
defer func() { require.NoError(t, s.Close()) }()
|
||||||
|
|
||||||
objects := prepare(t, 1, s, min, max)
|
objects := prepare(t, 1, s, minSize, maxSize)
|
||||||
|
|
||||||
t.Run("missing object", func(t *testing.T) {
|
t.Run("missing object", func(t *testing.T) {
|
||||||
prm := common.ExistsPrm{Address: oidtest.Address()}
|
prm := common.ExistsPrm{Address: oidtest.Address()}
|
||||||
|
|
|
@ -11,13 +11,13 @@ import (
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestGet(t *testing.T, cons Constructor, min, max uint64) {
|
func TestGet(t *testing.T, cons Constructor, minSize, maxSize uint64) {
|
||||||
s := cons(t)
|
s := cons(t)
|
||||||
require.NoError(t, s.Open(mode.ComponentReadWrite))
|
require.NoError(t, s.Open(mode.ComponentReadWrite))
|
||||||
require.NoError(t, s.Init())
|
require.NoError(t, s.Init())
|
||||||
defer func() { require.NoError(t, s.Close()) }()
|
defer func() { require.NoError(t, s.Close()) }()
|
||||||
|
|
||||||
objects := prepare(t, 2, s, min, max)
|
objects := prepare(t, 2, s, minSize, maxSize)
|
||||||
|
|
||||||
t.Run("missing object", func(t *testing.T) {
|
t.Run("missing object", func(t *testing.T) {
|
||||||
gPrm := common.GetPrm{Address: oidtest.Address()}
|
gPrm := common.GetPrm{Address: oidtest.Address()}
|
||||||
|
|
|
@ -13,13 +13,13 @@ import (
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestGetRange(t *testing.T, cons Constructor, min, max uint64) {
|
func TestGetRange(t *testing.T, cons Constructor, minSize, maxSize uint64) {
|
||||||
s := cons(t)
|
s := cons(t)
|
||||||
require.NoError(t, s.Open(mode.ComponentReadWrite))
|
require.NoError(t, s.Open(mode.ComponentReadWrite))
|
||||||
require.NoError(t, s.Init())
|
require.NoError(t, s.Init())
|
||||||
defer func() { require.NoError(t, s.Close()) }()
|
defer func() { require.NoError(t, s.Close()) }()
|
||||||
|
|
||||||
objects := prepare(t, 1, s, min, max)
|
objects := prepare(t, 1, s, minSize, maxSize)
|
||||||
|
|
||||||
t.Run("missing object", func(t *testing.T) {
|
t.Run("missing object", func(t *testing.T) {
|
||||||
gPrm := common.GetRangePrm{Address: oidtest.Address()}
|
gPrm := common.GetRangePrm{Address: oidtest.Address()}
|
||||||
|
|
|
@ -10,13 +10,13 @@ import (
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestIterate(t *testing.T, cons Constructor, min, max uint64) {
|
func TestIterate(t *testing.T, cons Constructor, minSize, maxSize uint64) {
|
||||||
s := cons(t)
|
s := cons(t)
|
||||||
require.NoError(t, s.Open(mode.ComponentReadWrite))
|
require.NoError(t, s.Open(mode.ComponentReadWrite))
|
||||||
require.NoError(t, s.Init())
|
require.NoError(t, s.Init())
|
||||||
defer func() { require.NoError(t, s.Close()) }()
|
defer func() { require.NoError(t, s.Close()) }()
|
||||||
|
|
||||||
objects := prepare(t, 10, s, min, max)
|
objects := prepare(t, 10, s, minSize, maxSize)
|
||||||
|
|
||||||
// Delete random object to ensure it is not iterated over.
|
// Delete random object to ensure it is not iterated over.
|
||||||
const delID = 2
|
const delID = 2
|
||||||
|
|
|
@ -1161,6 +1161,7 @@ func (t *boltForest) fillSortedChildren(b *bbolt.Bucket, nodeIDs MultiNode, h *f
|
||||||
lastFilename = nil
|
lastFilename = nil
|
||||||
nodes = nil
|
nodes = nil
|
||||||
length = actualLength + 1
|
length = actualLength + 1
|
||||||
|
count = 0
|
||||||
c.Seek(append(prefix, byte(length), byte(length>>8)))
|
c.Seek(append(prefix, byte(length), byte(length>>8)))
|
||||||
c.Prev() // c.Next() will be performed by for loop
|
c.Prev() // c.Next() will be performed by for loop
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,11 +1,13 @@
|
||||||
package pilorama
|
package pilorama
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"bytes"
|
||||||
"context"
|
"context"
|
||||||
"crypto/rand"
|
"crypto/rand"
|
||||||
"fmt"
|
"fmt"
|
||||||
mrand "math/rand"
|
mrand "math/rand"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
|
"slices"
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
|
@ -232,6 +234,65 @@ func BenchmarkForestSortedIteration(b *testing.B) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// The issue which we call "BugWithSkip" is easiest to understand when filenames are
|
||||||
|
// monotonically increasing numbers. We want the list of sorted filenames to have different length interleaved.
|
||||||
|
// The bug happens when we switch between length during listing.
|
||||||
|
// Thus this test contains numbers from 1 to 2000 and batch size of size 10.
|
||||||
|
func TestForest_TreeSortedIterationBugWithSkip(t *testing.T) {
|
||||||
|
for i := range providers {
|
||||||
|
t.Run(providers[i].name, func(t *testing.T) {
|
||||||
|
testForestTreeSortedIterationBugWithSkip(t, providers[i].construct(t))
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func testForestTreeSortedIterationBugWithSkip(t *testing.T, s ForestStorage) {
|
||||||
|
defer func() { require.NoError(t, s.Close()) }()
|
||||||
|
|
||||||
|
cid := cidtest.ID()
|
||||||
|
d := CIDDescriptor{cid, 0, 1}
|
||||||
|
treeID := "version"
|
||||||
|
treeAdd := func(t *testing.T, ts int, filename string) {
|
||||||
|
_, err := s.TreeMove(context.Background(), d, treeID, &Move{
|
||||||
|
Child: RootID + uint64(ts),
|
||||||
|
Parent: RootID,
|
||||||
|
Meta: Meta{
|
||||||
|
Time: Timestamp(ts),
|
||||||
|
Items: []KeyValue{
|
||||||
|
{Key: AttributeFilename, Value: []byte(filename)},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
})
|
||||||
|
require.NoError(t, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
const count = 2000
|
||||||
|
treeAdd(t, 1, "")
|
||||||
|
for i := 1; i < count; i++ {
|
||||||
|
treeAdd(t, i+1, strconv.Itoa(i+1))
|
||||||
|
}
|
||||||
|
|
||||||
|
var result []MultiNodeInfo
|
||||||
|
treeAppend := func(t *testing.T, last *string, count int) *string {
|
||||||
|
res, cursor, err := s.TreeSortedByFilename(context.Background(), d.CID, treeID, MultiNode{RootID}, last, count)
|
||||||
|
require.NoError(t, err)
|
||||||
|
result = append(result, res...)
|
||||||
|
return cursor
|
||||||
|
}
|
||||||
|
|
||||||
|
const batchSize = 10
|
||||||
|
last := treeAppend(t, nil, batchSize)
|
||||||
|
for i := 1; i < count/batchSize; i++ {
|
||||||
|
last = treeAppend(t, last, batchSize)
|
||||||
|
}
|
||||||
|
require.Len(t, result, count)
|
||||||
|
require.True(t, slices.IsSortedFunc(result, func(a, b MultiNodeInfo) int {
|
||||||
|
filenameA := findAttr(a.Meta, AttributeFilename)
|
||||||
|
filenameB := findAttr(b.Meta, AttributeFilename)
|
||||||
|
return bytes.Compare(filenameA, filenameB)
|
||||||
|
}))
|
||||||
|
}
|
||||||
|
|
||||||
func TestForest_TreeSortedIteration(t *testing.T) {
|
func TestForest_TreeSortedIteration(t *testing.T) {
|
||||||
for i := range providers {
|
for i := range providers {
|
||||||
t.Run(providers[i].name, func(t *testing.T) {
|
t.Run(providers[i].name, func(t *testing.T) {
|
||||||
|
|
|
@ -2,6 +2,8 @@ package pilorama
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"container/heap"
|
"container/heap"
|
||||||
|
"slices"
|
||||||
|
"strings"
|
||||||
)
|
)
|
||||||
|
|
||||||
type heapInfo struct {
|
type heapInfo struct {
|
||||||
|
@ -28,9 +30,10 @@ func (h *filenameHeap) Pop() any {
|
||||||
|
|
||||||
// fixedHeap maintains a fixed number of smallest elements started at some point.
|
// fixedHeap maintains a fixed number of smallest elements started at some point.
|
||||||
type fixedHeap struct {
|
type fixedHeap struct {
|
||||||
start *string
|
start *string
|
||||||
count int
|
sorted bool
|
||||||
h *filenameHeap
|
count int
|
||||||
|
h *filenameHeap
|
||||||
}
|
}
|
||||||
|
|
||||||
func newHeap(start *string, count int) *fixedHeap {
|
func newHeap(start *string, count int) *fixedHeap {
|
||||||
|
@ -44,20 +47,39 @@ func newHeap(start *string, count int) *fixedHeap {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
const amortizationMultiplier = 5
|
||||||
|
|
||||||
func (h *fixedHeap) push(id MultiNode, filename string) bool {
|
func (h *fixedHeap) push(id MultiNode, filename string) bool {
|
||||||
if h.start != nil && filename <= *h.start {
|
if h.start != nil && filename <= *h.start {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
heap.Push(h.h, heapInfo{id: id, filename: filename})
|
|
||||||
if h.h.Len() > h.count {
|
*h.h = append(*h.h, heapInfo{id: id, filename: filename})
|
||||||
heap.Remove(h.h, h.h.Len()-1)
|
h.sorted = false
|
||||||
|
|
||||||
|
if h.h.Len() > h.count*amortizationMultiplier {
|
||||||
|
slices.SortFunc(*h.h, func(a, b heapInfo) int {
|
||||||
|
return strings.Compare(a.filename, b.filename)
|
||||||
|
})
|
||||||
|
*h.h = (*h.h)[:h.count]
|
||||||
}
|
}
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *fixedHeap) pop() (heapInfo, bool) {
|
func (h *fixedHeap) pop() (heapInfo, bool) {
|
||||||
if h.h.Len() != 0 {
|
if !h.sorted {
|
||||||
return heap.Pop(h.h).(heapInfo), true
|
slices.SortFunc(*h.h, func(a, b heapInfo) int {
|
||||||
|
return strings.Compare(a.filename, b.filename)
|
||||||
|
})
|
||||||
|
if len(*h.h) > h.count {
|
||||||
|
*h.h = (*h.h)[:h.count]
|
||||||
|
}
|
||||||
|
h.sorted = true
|
||||||
|
}
|
||||||
|
if len(*h.h) != 0 {
|
||||||
|
info := (*h.h)[0]
|
||||||
|
*h.h = (*h.h)[1:]
|
||||||
|
return info, true
|
||||||
}
|
}
|
||||||
return heapInfo{}, false
|
return heapInfo{}, false
|
||||||
}
|
}
|
||||||
|
|
|
@ -641,8 +641,8 @@ func (c *Client) notaryTxValidationLimit() (uint32, error) {
|
||||||
return 0, fmt.Errorf("can't get current blockchain height: %w", err)
|
return 0, fmt.Errorf("can't get current blockchain height: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
min := bc + c.notary.txValidTime
|
minTime := bc + c.notary.txValidTime
|
||||||
rounded := (min/c.notary.roundTime + 1) * c.notary.roundTime
|
rounded := (minTime/c.notary.roundTime + 1) * c.notary.roundTime
|
||||||
|
|
||||||
return rounded, nil
|
return rounded, nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -33,10 +33,15 @@ func (a *auditService) AddChain(ctx context.Context, req *apemanager.AddChainReq
|
||||||
return res, err
|
return res, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var respChainID []byte
|
||||||
|
if respBody := res.GetBody(); respBody != nil {
|
||||||
|
respChainID = respBody.GetChainID()
|
||||||
|
}
|
||||||
|
|
||||||
audit.LogRequest(a.log, ape_grpc.APEManagerService_AddChain_FullMethodName, req,
|
audit.LogRequest(a.log, ape_grpc.APEManagerService_AddChain_FullMethodName, req,
|
||||||
audit.TargetFromChainID(req.GetBody().GetTarget().GetTargetType().String(),
|
audit.TargetFromChainID(req.GetBody().GetTarget().GetTargetType().String(),
|
||||||
req.GetBody().GetTarget().GetName(),
|
req.GetBody().GetTarget().GetName(),
|
||||||
res.GetBody().GetChainID()),
|
respChainID),
|
||||||
err == nil)
|
err == nil)
|
||||||
|
|
||||||
return res, err
|
return res, err
|
||||||
|
|
|
@ -2,9 +2,11 @@ package getsvc
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"errors"
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util"
|
||||||
|
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
||||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
@ -120,6 +122,12 @@ func (exec *request) analyzeStatus(ctx context.Context, execCnr bool) {
|
||||||
exec.log.Debug(logs.OperationFinishedWithError,
|
exec.log.Debug(logs.OperationFinishedWithError,
|
||||||
zap.Error(exec.err),
|
zap.Error(exec.err),
|
||||||
)
|
)
|
||||||
|
var errAccessDenied *apistatus.ObjectAccessDenied
|
||||||
|
if execCnr && errors.As(exec.err, &errAccessDenied) {
|
||||||
|
// Local get can't return access denied error, so this error was returned by
|
||||||
|
// write to the output stream. So there is no need to try to find object on other nodes.
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
if execCnr {
|
if execCnr {
|
||||||
exec.executeOnContainer(ctx)
|
exec.executeOnContainer(ctx)
|
||||||
|
|
|
@ -31,6 +31,7 @@ func (r *request) processNode(ctx context.Context, info client.NodeInfo) bool {
|
||||||
var errECInfo *objectSDK.ECInfoError
|
var errECInfo *objectSDK.ECInfoError
|
||||||
var errRemoved *apistatus.ObjectAlreadyRemoved
|
var errRemoved *apistatus.ObjectAlreadyRemoved
|
||||||
var errOutOfRange *apistatus.ObjectOutOfRange
|
var errOutOfRange *apistatus.ObjectOutOfRange
|
||||||
|
var errAccessDenied *apistatus.ObjectAccessDenied
|
||||||
|
|
||||||
switch {
|
switch {
|
||||||
default:
|
default:
|
||||||
|
@ -38,7 +39,11 @@ func (r *request) processNode(ctx context.Context, info client.NodeInfo) bool {
|
||||||
if r.status != statusEC {
|
if r.status != statusEC {
|
||||||
// for raw requests, continue to collect other parts
|
// for raw requests, continue to collect other parts
|
||||||
r.status = statusUndefined
|
r.status = statusUndefined
|
||||||
r.err = new(apistatus.ObjectNotFound)
|
if errors.As(err, &errAccessDenied) {
|
||||||
|
r.err = err
|
||||||
|
} else {
|
||||||
|
r.err = new(apistatus.ObjectNotFound)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
return false
|
return false
|
||||||
case err == nil:
|
case err == nil:
|
||||||
|
|
|
@ -23,12 +23,14 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
type getRequestForwarder struct {
|
type getRequestForwarder struct {
|
||||||
OnceResign sync.Once
|
OnceResign sync.Once
|
||||||
OnceHeaderSending sync.Once
|
GlobalProgress int
|
||||||
GlobalProgress int
|
Key *ecdsa.PrivateKey
|
||||||
Key *ecdsa.PrivateKey
|
Request *objectV2.GetRequest
|
||||||
Request *objectV2.GetRequest
|
Stream *streamObjectWriter
|
||||||
Stream *streamObjectWriter
|
|
||||||
|
headerSent bool
|
||||||
|
headerSentGuard sync.Mutex
|
||||||
}
|
}
|
||||||
|
|
||||||
func (f *getRequestForwarder) forwardRequestToNode(ctx context.Context, addr network.Address, c client.MultiAddressClient, pubkey []byte) (*objectSDK.Object, error) {
|
func (f *getRequestForwarder) forwardRequestToNode(ctx context.Context, addr network.Address, c client.MultiAddressClient, pubkey []byte) (*objectSDK.Object, error) {
|
||||||
|
@ -83,13 +85,15 @@ func (f *getRequestForwarder) writeHeader(ctx context.Context, v *objectV2.GetOb
|
||||||
obj.SetSignature(v.GetSignature())
|
obj.SetSignature(v.GetSignature())
|
||||||
obj.SetHeader(v.GetHeader())
|
obj.SetHeader(v.GetHeader())
|
||||||
|
|
||||||
var err error
|
f.headerSentGuard.Lock()
|
||||||
f.OnceHeaderSending.Do(func() {
|
defer f.headerSentGuard.Unlock()
|
||||||
err = f.Stream.WriteHeader(ctx, objectSDK.NewFromV2(obj))
|
if f.headerSent {
|
||||||
})
|
return nil
|
||||||
if err != nil {
|
}
|
||||||
|
if err := f.Stream.WriteHeader(ctx, objectSDK.NewFromV2(obj)); err != nil {
|
||||||
return errCouldNotWriteObjHeader(err)
|
return errCouldNotWriteObjHeader(err)
|
||||||
}
|
}
|
||||||
|
f.headerSent = true
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -24,6 +24,8 @@ type distributedTarget struct {
|
||||||
nodeTargetInitializer func(nodeDesc) preparedObjectTarget
|
nodeTargetInitializer func(nodeDesc) preparedObjectTarget
|
||||||
|
|
||||||
relay func(context.Context, nodeDesc) error
|
relay func(context.Context, nodeDesc) error
|
||||||
|
|
||||||
|
resetSuccessAfterOnBroadcast bool
|
||||||
}
|
}
|
||||||
|
|
||||||
// parameters and state of container traversal.
|
// parameters and state of container traversal.
|
||||||
|
@ -35,6 +37,8 @@ type traversal struct {
|
||||||
|
|
||||||
// container nodes which was processed during the primary object placement
|
// container nodes which was processed during the primary object placement
|
||||||
mExclude map[string]*bool
|
mExclude map[string]*bool
|
||||||
|
|
||||||
|
resetSuccessAfterOnBroadcast bool
|
||||||
}
|
}
|
||||||
|
|
||||||
// updates traversal parameters after the primary placement finish and
|
// updates traversal parameters after the primary placement finish and
|
||||||
|
@ -44,6 +48,10 @@ func (x *traversal) submitPrimaryPlacementFinish() bool {
|
||||||
// do not track success during container broadcast (best-effort)
|
// do not track success during container broadcast (best-effort)
|
||||||
x.opts = append(x.opts, placement.WithoutSuccessTracking())
|
x.opts = append(x.opts, placement.WithoutSuccessTracking())
|
||||||
|
|
||||||
|
if x.resetSuccessAfterOnBroadcast {
|
||||||
|
x.opts = append(x.opts, placement.ResetSuccessAfter())
|
||||||
|
}
|
||||||
|
|
||||||
// avoid 2nd broadcast
|
// avoid 2nd broadcast
|
||||||
x.extraBroadcastEnabled = false
|
x.extraBroadcastEnabled = false
|
||||||
|
|
||||||
|
@ -118,5 +126,6 @@ func (t *distributedTarget) iteratePlacement(ctx context.Context) error {
|
||||||
|
|
||||||
iter := t.cfg.newNodeIterator(append(t.placementOpts, placement.ForObject(id)))
|
iter := t.cfg.newNodeIterator(append(t.placementOpts, placement.ForObject(id)))
|
||||||
iter.extraBroadcastEnabled = needAdditionalBroadcast(t.obj, false /* Distributed target is for cluster-wide PUT */)
|
iter.extraBroadcastEnabled = needAdditionalBroadcast(t.obj, false /* Distributed target is for cluster-wide PUT */)
|
||||||
|
iter.resetSuccessAfterOnBroadcast = t.resetSuccessAfterOnBroadcast
|
||||||
return iter.forEachNode(ctx, t.sendObject)
|
return iter.forEachNode(ctx, t.sendObject)
|
||||||
}
|
}
|
||||||
|
|
|
@ -197,14 +197,15 @@ func (e *ecWriter) writeRawObject(ctx context.Context, obj *objectSDK.Object) er
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
partsProcessed := make([]atomic.Bool, len(parts))
|
||||||
objID, _ := obj.ID()
|
objID, _ := obj.ID()
|
||||||
t, err := placement.NewTraverser(append(e.placementOpts, placement.ForObject(objID))...)
|
t, err := placement.NewTraverser(append(e.placementOpts, placement.ForObject(objID))...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
eg, egCtx := errgroup.WithContext(ctx)
|
|
||||||
for {
|
for {
|
||||||
|
eg, egCtx := errgroup.WithContext(ctx)
|
||||||
nodes := t.Next()
|
nodes := t.Next()
|
||||||
if len(nodes) == 0 {
|
if len(nodes) == 0 {
|
||||||
break
|
break
|
||||||
|
@ -216,14 +217,20 @@ func (e *ecWriter) writeRawObject(ctx context.Context, obj *objectSDK.Object) er
|
||||||
}
|
}
|
||||||
|
|
||||||
for idx := range parts {
|
for idx := range parts {
|
||||||
idx := idx
|
if !partsProcessed[idx].Load() {
|
||||||
eg.Go(func() error {
|
eg.Go(func() error {
|
||||||
return e.writePart(egCtx, parts[idx], idx, nodes, visited)
|
err := e.writePart(egCtx, parts[idx], idx, nodes, visited)
|
||||||
})
|
if err == nil {
|
||||||
t.SubmitSuccess()
|
partsProcessed[idx].Store(true)
|
||||||
|
t.SubmitSuccess()
|
||||||
|
}
|
||||||
|
return err
|
||||||
|
})
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
err = eg.Wait()
|
||||||
}
|
}
|
||||||
if err := eg.Wait(); err != nil {
|
if err != nil {
|
||||||
return errIncompletePut{
|
return errIncompletePut{
|
||||||
singleErr: err,
|
singleErr: err,
|
||||||
}
|
}
|
||||||
|
|
191
pkg/services/object/put/ec_test.go
Normal file
191
pkg/services/object/put/ec_test.go
Normal file
|
@ -0,0 +1,191 @@
|
||||||
|
package putsvc
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"context"
|
||||||
|
"crypto/rand"
|
||||||
|
"crypto/sha256"
|
||||||
|
"errors"
|
||||||
|
"fmt"
|
||||||
|
"strconv"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
rawclient "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/rpc/client"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
|
||||||
|
netmapcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/checksum"
|
||||||
|
apiclient "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
|
||||||
|
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||||
|
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
|
||||||
|
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||||
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||||
|
oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test"
|
||||||
|
usertest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user/test"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/version"
|
||||||
|
"git.frostfs.info/TrueCloudLab/tzhash/tz"
|
||||||
|
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
|
||||||
|
"github.com/panjf2000/ants/v2"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
)
|
||||||
|
|
||||||
|
type testPlacementBuilder struct {
|
||||||
|
vectors [][]netmap.NodeInfo
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *testPlacementBuilder) BuildPlacement(_ cid.ID, _ *oid.ID, _ netmap.PlacementPolicy) (
|
||||||
|
[][]netmap.NodeInfo, error,
|
||||||
|
) {
|
||||||
|
arr := make([]netmap.NodeInfo, len(p.vectors[0]))
|
||||||
|
copy(arr, p.vectors[0])
|
||||||
|
return [][]netmap.NodeInfo{arr}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
type nmKeys struct{}
|
||||||
|
|
||||||
|
func (nmKeys) IsLocalKey(_ []byte) bool {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
type clientConstructor struct {
|
||||||
|
vectors [][]netmap.NodeInfo
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c clientConstructor) Get(info client.NodeInfo) (client.MultiAddressClient, error) {
|
||||||
|
if bytes.Equal(info.PublicKey(), c.vectors[0][0].PublicKey()) ||
|
||||||
|
bytes.Equal(info.PublicKey(), c.vectors[0][1].PublicKey()) {
|
||||||
|
return multiAddressClient{err: errors.New("node unavailable")}, nil
|
||||||
|
}
|
||||||
|
return multiAddressClient{}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
type multiAddressClient struct {
|
||||||
|
client.MultiAddressClient
|
||||||
|
err error
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c multiAddressClient) ObjectPutSingle(_ context.Context, _ apiclient.PrmObjectPutSingle) (*apiclient.ResObjectPutSingle, error) {
|
||||||
|
if c.err != nil {
|
||||||
|
return nil, c.err
|
||||||
|
}
|
||||||
|
return &apiclient.ResObjectPutSingle{}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c multiAddressClient) ReportError(error) {
|
||||||
|
}
|
||||||
|
|
||||||
|
func (multiAddressClient) RawForAddress(context.Context, network.Address, func(cli *rawclient.Client) error) error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestECWriter(t *testing.T) {
|
||||||
|
// Create container with policy EC 1.1
|
||||||
|
cnr := container.Container{}
|
||||||
|
p1 := netmap.PlacementPolicy{}
|
||||||
|
p1.SetContainerBackupFactor(1)
|
||||||
|
x1 := netmap.ReplicaDescriptor{}
|
||||||
|
x1.SetECDataCount(1)
|
||||||
|
x1.SetECParityCount(1)
|
||||||
|
p1.AddReplicas(x1)
|
||||||
|
cnr.SetPlacementPolicy(p1)
|
||||||
|
cnr.SetAttribute("cnr", "cnr1")
|
||||||
|
|
||||||
|
cid := cidtest.ID()
|
||||||
|
|
||||||
|
// Create 4 nodes, 2 nodes for chunks,
|
||||||
|
// 2 nodes for the case when the first two will fail.
|
||||||
|
ns, _ := testNodeMatrix(t, []int{4})
|
||||||
|
|
||||||
|
data := make([]byte, 100)
|
||||||
|
_, _ = rand.Read(data)
|
||||||
|
ver := version.Current()
|
||||||
|
|
||||||
|
var csum checksum.Checksum
|
||||||
|
csum.SetSHA256(sha256.Sum256(data))
|
||||||
|
|
||||||
|
var csumTZ checksum.Checksum
|
||||||
|
csumTZ.SetTillichZemor(tz.Sum(csum.Value()))
|
||||||
|
|
||||||
|
obj := objectSDK.New()
|
||||||
|
obj.SetID(oidtest.ID())
|
||||||
|
obj.SetOwnerID(usertest.ID())
|
||||||
|
obj.SetContainerID(cid)
|
||||||
|
obj.SetVersion(&ver)
|
||||||
|
obj.SetPayload(data)
|
||||||
|
obj.SetPayloadSize(uint64(len(data)))
|
||||||
|
obj.SetPayloadChecksum(csum)
|
||||||
|
obj.SetPayloadHomomorphicHash(csumTZ)
|
||||||
|
|
||||||
|
// Builder return nodes without sort by hrw
|
||||||
|
builder := &testPlacementBuilder{
|
||||||
|
vectors: ns,
|
||||||
|
}
|
||||||
|
|
||||||
|
ownerKey, err := keys.NewPrivateKey()
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
pool, err := ants.NewPool(4, ants.WithNonblocking(true))
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
log, err := logger.NewLogger(nil)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
var n nmKeys
|
||||||
|
ecw := ecWriter{
|
||||||
|
cfg: &cfg{
|
||||||
|
netmapKeys: n,
|
||||||
|
remotePool: pool,
|
||||||
|
log: log,
|
||||||
|
clientConstructor: clientConstructor{vectors: ns},
|
||||||
|
},
|
||||||
|
placementOpts: append(
|
||||||
|
[]placement.Option{placement.UseBuilder(builder), placement.ForContainer(cnr)},
|
||||||
|
placement.WithCopyNumbers(nil)), // copies number ignored for EC
|
||||||
|
container: cnr,
|
||||||
|
key: &ownerKey.PrivateKey,
|
||||||
|
relay: nil,
|
||||||
|
objMetaValid: true,
|
||||||
|
}
|
||||||
|
|
||||||
|
err = ecw.WriteObject(context.Background(), obj)
|
||||||
|
require.NoError(t, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
func testNodeMatrix(t testing.TB, dim []int) ([][]netmap.NodeInfo, [][]string) {
|
||||||
|
mNodes := make([][]netmap.NodeInfo, len(dim))
|
||||||
|
mAddr := make([][]string, len(dim))
|
||||||
|
|
||||||
|
for i := range dim {
|
||||||
|
ns := make([]netmap.NodeInfo, dim[i])
|
||||||
|
as := make([]string, dim[i])
|
||||||
|
|
||||||
|
for j := range dim[i] {
|
||||||
|
a := fmt.Sprintf("/ip4/192.168.0.%s/tcp/%s",
|
||||||
|
strconv.Itoa(i),
|
||||||
|
strconv.Itoa(60000+j),
|
||||||
|
)
|
||||||
|
|
||||||
|
var ni netmap.NodeInfo
|
||||||
|
ni.SetNetworkEndpoints(a)
|
||||||
|
ni.SetPublicKey([]byte(a))
|
||||||
|
|
||||||
|
var na network.AddressGroup
|
||||||
|
|
||||||
|
err := na.FromIterator(netmapcore.Node(ni))
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
as[j] = network.StringifyGroup(na)
|
||||||
|
|
||||||
|
ns[j] = ni
|
||||||
|
}
|
||||||
|
|
||||||
|
mNodes[i] = ns
|
||||||
|
mAddr[i] = as
|
||||||
|
}
|
||||||
|
|
||||||
|
return mNodes, mAddr
|
||||||
|
}
|
|
@ -166,6 +166,7 @@ func (s *Service) saveToNodes(ctx context.Context, obj *objectSDK.Object, req *o
|
||||||
func (s *Service) saveToREPReplicas(ctx context.Context, placement putSinglePlacement, obj *objectSDK.Object, localOnly bool, req *objectAPI.PutSingleRequest, meta object.ContentMeta) error {
|
func (s *Service) saveToREPReplicas(ctx context.Context, placement putSinglePlacement, obj *objectSDK.Object, localOnly bool, req *objectAPI.PutSingleRequest, meta object.ContentMeta) error {
|
||||||
iter := s.cfg.newNodeIterator(placement.placementOptions)
|
iter := s.cfg.newNodeIterator(placement.placementOptions)
|
||||||
iter.extraBroadcastEnabled = needAdditionalBroadcast(obj, localOnly)
|
iter.extraBroadcastEnabled = needAdditionalBroadcast(obj, localOnly)
|
||||||
|
iter.resetSuccessAfterOnBroadcast = placement.resetSuccessAfterOnBroadcast
|
||||||
|
|
||||||
signer := &putSingleRequestSigner{
|
signer := &putSingleRequestSigner{
|
||||||
req: req,
|
req: req,
|
||||||
|
@ -209,9 +210,10 @@ func (s *Service) saveToECReplicas(ctx context.Context, placement putSinglePlace
|
||||||
}
|
}
|
||||||
|
|
||||||
type putSinglePlacement struct {
|
type putSinglePlacement struct {
|
||||||
placementOptions []placement.Option
|
placementOptions []placement.Option
|
||||||
isEC bool
|
isEC bool
|
||||||
container containerSDK.Container
|
container containerSDK.Container
|
||||||
|
resetSuccessAfterOnBroadcast bool
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Service) getPutSinglePlacementOptions(obj *objectSDK.Object, copiesNumber []uint32, localOnly bool) (putSinglePlacement, error) {
|
func (s *Service) getPutSinglePlacementOptions(obj *objectSDK.Object, copiesNumber []uint32, localOnly bool) (putSinglePlacement, error) {
|
||||||
|
@ -232,6 +234,7 @@ func (s *Service) getPutSinglePlacementOptions(obj *objectSDK.Object, copiesNumb
|
||||||
}
|
}
|
||||||
if container.IsECContainer(cnrInfo.Value) && !object.IsECSupported(obj) && !localOnly {
|
if container.IsECContainer(cnrInfo.Value) && !object.IsECSupported(obj) && !localOnly {
|
||||||
result.placementOptions = append(result.placementOptions, placement.SuccessAfter(uint32(policy.ECParityCount(cnrInfo.Value.PlacementPolicy())+1)))
|
result.placementOptions = append(result.placementOptions, placement.SuccessAfter(uint32(policy.ECParityCount(cnrInfo.Value.PlacementPolicy())+1)))
|
||||||
|
result.resetSuccessAfterOnBroadcast = true
|
||||||
}
|
}
|
||||||
result.placementOptions = append(result.placementOptions, placement.ForContainer(cnrInfo.Value))
|
result.placementOptions = append(result.placementOptions, placement.ForContainer(cnrInfo.Value))
|
||||||
|
|
||||||
|
|
|
@ -233,16 +233,19 @@ func (p *Streamer) newDefaultObjectWriter(prm *PutInitPrm, forECPlacement bool)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var resetSuccessAfterOnBroadcast bool
|
||||||
traverseOpts := prm.traverseOpts
|
traverseOpts := prm.traverseOpts
|
||||||
if forECPlacement && !prm.common.LocalOnly() {
|
if forECPlacement && !prm.common.LocalOnly() {
|
||||||
// save non-regular and linking object to EC container.
|
// save non-regular and linking object to EC container.
|
||||||
// EC 2.1 -> REP 2, EC 2.2 -> REP 3 etc.
|
// EC 2.1 -> REP 2, EC 2.2 -> REP 3 etc.
|
||||||
traverseOpts = append(traverseOpts, placement.SuccessAfter(uint32(policy.ECParityCount(prm.cnr.PlacementPolicy())+1)))
|
traverseOpts = append(traverseOpts, placement.SuccessAfter(uint32(policy.ECParityCount(prm.cnr.PlacementPolicy())+1)))
|
||||||
|
resetSuccessAfterOnBroadcast = true
|
||||||
}
|
}
|
||||||
|
|
||||||
return &distributedTarget{
|
return &distributedTarget{
|
||||||
cfg: p.cfg,
|
cfg: p.cfg,
|
||||||
placementOpts: traverseOpts,
|
placementOpts: traverseOpts,
|
||||||
|
resetSuccessAfterOnBroadcast: resetSuccessAfterOnBroadcast,
|
||||||
nodeTargetInitializer: func(node nodeDesc) preparedObjectTarget {
|
nodeTargetInitializer: func(node nodeDesc) preparedObjectTarget {
|
||||||
if node.local {
|
if node.local {
|
||||||
return localTarget{
|
return localTarget{
|
||||||
|
|
|
@ -303,6 +303,13 @@ func SuccessAfter(v uint32) Option {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ResetSuccessAfter resets flat success number setting option.
|
||||||
|
func ResetSuccessAfter() Option {
|
||||||
|
return func(c *cfg) {
|
||||||
|
c.flatSuccess = nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// WithoutSuccessTracking disables success tracking in traversal.
|
// WithoutSuccessTracking disables success tracking in traversal.
|
||||||
func WithoutSuccessTracking() Option {
|
func WithoutSuccessTracking() Option {
|
||||||
return func(c *cfg) {
|
return func(c *cfg) {
|
||||||
|
|
1
staticcheck.conf
Normal file
1
staticcheck.conf
Normal file
|
@ -0,0 +1 @@
|
||||||
|
checks = ["inherit", "-SA1019"]
|
Loading…
Reference in a new issue