Compare commits
13 commits
v0.42.9
...
support/v0
Author | SHA1 | Date | |
---|---|---|---|
990e984023 | |||
a83dd231ec | |||
51f0bf8be0 | |||
52c04ef475 | |||
8423e0f6f9 | |||
08593f664b | |||
eec359cfa8 | |||
43d6fbf73b | |||
01e18eda43 | |||
3e89e744aa | |||
5aef303259 | |||
dd570c9344 | |||
149f8f4b08 |
36 changed files with 540 additions and 96 deletions
|
@ -8,7 +8,7 @@ jobs:
|
|||
runs-on: ubuntu-latest
|
||||
strategy:
|
||||
matrix:
|
||||
go_versions: [ '1.21', '1.22' ]
|
||||
go_versions: [ '1.22', '1.23' ]
|
||||
|
||||
steps:
|
||||
- uses: actions/checkout@v3
|
||||
|
|
|
@ -16,7 +16,7 @@ jobs:
|
|||
- name: Set up Go
|
||||
uses: actions/setup-go@v3
|
||||
with:
|
||||
go-version: 1.22
|
||||
go-version: 1.23
|
||||
- name: Set up Python
|
||||
run: |
|
||||
apt update
|
||||
|
|
|
@ -11,7 +11,7 @@ jobs:
|
|||
- name: Set up Go
|
||||
uses: actions/setup-go@v3
|
||||
with:
|
||||
go-version: '1.22'
|
||||
go-version: '1.23'
|
||||
cache: true
|
||||
|
||||
- name: Install linters
|
||||
|
@ -25,7 +25,7 @@ jobs:
|
|||
runs-on: ubuntu-latest
|
||||
strategy:
|
||||
matrix:
|
||||
go_versions: [ '1.21', '1.22' ]
|
||||
go_versions: [ '1.22', '1.23' ]
|
||||
fail-fast: false
|
||||
steps:
|
||||
- uses: actions/checkout@v3
|
||||
|
@ -48,7 +48,7 @@ jobs:
|
|||
- name: Set up Go
|
||||
uses: actions/setup-go@v3
|
||||
with:
|
||||
go-version: '1.21'
|
||||
go-version: '1.22'
|
||||
cache: true
|
||||
|
||||
- name: Run tests
|
||||
|
@ -63,7 +63,7 @@ jobs:
|
|||
- name: Set up Go
|
||||
uses: actions/setup-go@v3
|
||||
with:
|
||||
go-version: '1.22'
|
||||
go-version: '1.23'
|
||||
cache: true
|
||||
|
||||
- name: Install staticcheck
|
||||
|
@ -81,7 +81,7 @@ jobs:
|
|||
- name: Set up Go
|
||||
uses: actions/setup-go@v3
|
||||
with:
|
||||
go-version: '1.21'
|
||||
go-version: '1.22'
|
||||
cache: true
|
||||
|
||||
- name: Install gopls
|
||||
|
|
|
@ -13,7 +13,7 @@ jobs:
|
|||
- name: Setup Go
|
||||
uses: actions/setup-go@v3
|
||||
with:
|
||||
go-version: '1.22'
|
||||
go-version: '1.23'
|
||||
|
||||
- name: Install govulncheck
|
||||
run: go install golang.org/x/vuln/cmd/govulncheck@latest
|
||||
|
|
|
@ -12,7 +12,8 @@ run:
|
|||
# output configuration options
|
||||
output:
|
||||
# colored-line-number|line-number|json|tab|checkstyle|code-climate, default is "colored-line-number"
|
||||
format: tab
|
||||
formats:
|
||||
- format: tab
|
||||
|
||||
# all available settings of specific linters
|
||||
linters-settings:
|
||||
|
|
10
Makefile
10
Makefile
|
@ -8,8 +8,8 @@ HUB_IMAGE ?= truecloudlab/frostfs
|
|||
HUB_TAG ?= "$(shell echo ${VERSION} | sed 's/^v//')"
|
||||
|
||||
GO_VERSION ?= 1.22
|
||||
LINT_VERSION ?= 1.56.1
|
||||
TRUECLOUDLAB_LINT_VERSION ?= 0.0.5
|
||||
LINT_VERSION ?= 1.60.1
|
||||
TRUECLOUDLAB_LINT_VERSION ?= 0.0.7
|
||||
PROTOC_VERSION ?= 25.0
|
||||
PROTOC_GEN_GO_VERSION ?= $(shell go list -f '{{.Version}}' -m google.golang.org/protobuf)
|
||||
PROTOGEN_FROSTFS_VERSION ?= $(shell go list -f '{{.Version}}' -m git.frostfs.info/TrueCloudLab/frostfs-api-go/v2)
|
||||
|
@ -17,7 +17,7 @@ PROTOC_OS_VERSION=osx-x86_64
|
|||
ifeq ($(shell uname), Linux)
|
||||
PROTOC_OS_VERSION=linux-x86_64
|
||||
endif
|
||||
STATICCHECK_VERSION ?= 2023.1.6
|
||||
STATICCHECK_VERSION ?= 2024.1.1
|
||||
ARCH = amd64
|
||||
|
||||
BIN = bin
|
||||
|
@ -190,7 +190,7 @@ lint-install:
|
|||
@@make -C $(TMP_DIR)/linters lib CGO_ENABLED=1 OUT_DIR=$(OUTPUT_LINT_DIR)
|
||||
@rm -rf $(TMP_DIR)/linters
|
||||
@rmdir $(TMP_DIR) 2>/dev/null || true
|
||||
@CGO_ENABLED=1 GOBIN=$(LINT_DIR) go install github.com/golangci/golangci-lint/cmd/golangci-lint@v$(LINT_VERSION)
|
||||
@CGO_ENABLED=1 GOBIN=$(LINT_DIR) go install -trimpath github.com/golangci/golangci-lint/cmd/golangci-lint@v$(LINT_VERSION)
|
||||
|
||||
# Run linters
|
||||
lint:
|
||||
|
@ -210,7 +210,7 @@ staticcheck-run:
|
|||
@if [ ! -d "$(STATICCHECK_VERSION_DIR)" ]; then \
|
||||
make staticcheck-install; \
|
||||
fi
|
||||
@$(STATICCHECK_VERSION_DIR)/staticcheck ./...
|
||||
@$(STATICCHECK_VERSION_DIR)/staticcheck -checks inherit,-SA1019 ./...
|
||||
|
||||
# Install gopls
|
||||
gopls-install:
|
||||
|
|
|
@ -49,7 +49,7 @@ The latest version of frostfs-node works with frostfs-contract
|
|||
|
||||
# Building
|
||||
|
||||
To make all binaries you need Go 1.21+ and `make`:
|
||||
To make all binaries you need Go 1.22+ and `make`:
|
||||
```
|
||||
make all
|
||||
```
|
||||
|
|
|
@ -1070,7 +1070,7 @@ func initLocalStorage(ctx context.Context, c *cfg) {
|
|||
c.onShutdown(func() {
|
||||
c.log.Info(logs.FrostFSNodeClosingComponentsOfTheStorageEngine)
|
||||
|
||||
err := ls.Close(context.Background())
|
||||
err := ls.Close(context.WithoutCancel(ctx))
|
||||
if err != nil {
|
||||
c.log.Info(logs.FrostFSNodeStorageEngineClosingFailure,
|
||||
zap.String("error", err.Error()),
|
||||
|
|
|
@ -21,7 +21,7 @@ func initTracing(ctx context.Context, c *cfg) {
|
|||
c.closers = append(c.closers, closer{
|
||||
name: "tracing",
|
||||
fn: func() {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
|
||||
ctx, cancel := context.WithTimeout(context.WithoutCancel(ctx), time.Second*5)
|
||||
defer cancel()
|
||||
err := tracing.Shutdown(ctx) // cfg context cancels before close
|
||||
if err != nil {
|
||||
|
|
34
go.mod
34
go.mod
|
@ -1,14 +1,14 @@
|
|||
module git.frostfs.info/TrueCloudLab/frostfs-node
|
||||
|
||||
go 1.21
|
||||
go 1.22
|
||||
|
||||
require (
|
||||
code.gitea.io/sdk/gitea v0.17.1
|
||||
git.frostfs.info/TrueCloudLab/frostfs-api-go/v2 v2.16.1-0.20240726072425-3dfa2f4fd65e
|
||||
git.frostfs.info/TrueCloudLab/frostfs-api-go/v2 v2.16.1-0.20240906121927-2c79f770e449
|
||||
git.frostfs.info/TrueCloudLab/frostfs-contract v0.19.3-0.20240621131249-49e5270f673e
|
||||
git.frostfs.info/TrueCloudLab/frostfs-crypto v0.6.0
|
||||
git.frostfs.info/TrueCloudLab/frostfs-observability v0.0.0-20231101111734-b3ad3335ff65
|
||||
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20240726111349-9da46f566fec
|
||||
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20240906135451-a7aabe53491c
|
||||
git.frostfs.info/TrueCloudLab/hrw v1.2.1
|
||||
git.frostfs.info/TrueCloudLab/policy-engine v0.0.0-20240712081403-2628f6184984
|
||||
git.frostfs.info/TrueCloudLab/tzhash v1.8.0
|
||||
|
@ -23,7 +23,7 @@ require (
|
|||
github.com/mitchellh/go-homedir v1.1.0
|
||||
github.com/mr-tron/base58 v1.2.0
|
||||
github.com/multiformats/go-multiaddr v0.12.1
|
||||
github.com/nspcc-dev/neo-go v0.106.2
|
||||
github.com/nspcc-dev/neo-go v0.106.3
|
||||
github.com/olekukonko/tablewriter v0.0.5
|
||||
github.com/panjf2000/ants/v2 v2.9.0
|
||||
github.com/paulmach/orb v0.11.0
|
||||
|
@ -39,11 +39,11 @@ require (
|
|||
go.opentelemetry.io/otel/trace v1.22.0
|
||||
go.uber.org/zap v1.27.0
|
||||
golang.org/x/exp v0.0.0-20240222234643-814bf88cf225
|
||||
golang.org/x/sync v0.6.0
|
||||
golang.org/x/sys v0.18.0
|
||||
golang.org/x/term v0.18.0
|
||||
google.golang.org/grpc v1.63.2
|
||||
google.golang.org/protobuf v1.33.0
|
||||
golang.org/x/sync v0.8.0
|
||||
golang.org/x/sys v0.25.0
|
||||
golang.org/x/term v0.24.0
|
||||
google.golang.org/grpc v1.66.0
|
||||
google.golang.org/protobuf v1.34.2
|
||||
gopkg.in/yaml.v3 v3.0.1
|
||||
)
|
||||
|
||||
|
@ -60,10 +60,10 @@ require (
|
|||
github.com/beorn7/perks v1.0.1 // indirect
|
||||
github.com/bits-and-blooms/bitset v1.13.0 // indirect
|
||||
github.com/cenkalti/backoff/v4 v4.2.1 // indirect
|
||||
github.com/cespare/xxhash/v2 v2.2.0 // indirect
|
||||
github.com/cespare/xxhash/v2 v2.3.0 // indirect
|
||||
github.com/consensys/bavard v0.1.13 // indirect
|
||||
github.com/consensys/gnark-crypto v0.12.2-0.20231222162921-eb75782795d2 // indirect
|
||||
github.com/cpuguy83/go-md2man/v2 v2.0.3 // indirect
|
||||
github.com/cpuguy83/go-md2man/v2 v2.0.4 // indirect
|
||||
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
|
||||
github.com/davidmz/go-pageant v1.0.2 // indirect
|
||||
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.2.0 // indirect
|
||||
|
@ -96,7 +96,7 @@ require (
|
|||
github.com/multiformats/go-multihash v0.2.3 // indirect
|
||||
github.com/multiformats/go-varint v0.0.7 // indirect
|
||||
github.com/nspcc-dev/go-ordered-json v0.0.0-20240301084351-0246b013f8b2 // indirect
|
||||
github.com/nspcc-dev/neo-go/pkg/interop v0.0.0-20240521091047-78685785716d // indirect
|
||||
github.com/nspcc-dev/neo-go/pkg/interop v0.0.0-20240727093519-1a48f1ce43ec // indirect
|
||||
github.com/nspcc-dev/rfc6979 v0.2.1 // indirect
|
||||
github.com/pelletier/go-toml/v2 v2.1.1 // indirect
|
||||
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
|
||||
|
@ -118,11 +118,11 @@ require (
|
|||
go.opentelemetry.io/otel/sdk v1.22.0 // indirect
|
||||
go.opentelemetry.io/proto/otlp v1.1.0 // indirect
|
||||
go.uber.org/multierr v1.11.0 // indirect
|
||||
golang.org/x/crypto v0.21.0 // indirect
|
||||
golang.org/x/net v0.23.0 // indirect
|
||||
golang.org/x/text v0.14.0 // indirect
|
||||
google.golang.org/genproto/googleapis/api v0.0.0-20240227224415-6ceb2ff114de // indirect
|
||||
google.golang.org/genproto/googleapis/rpc v0.0.0-20240227224415-6ceb2ff114de // indirect
|
||||
golang.org/x/crypto v0.27.0 // indirect
|
||||
golang.org/x/net v0.29.0 // indirect
|
||||
golang.org/x/text v0.18.0 // indirect
|
||||
google.golang.org/genproto/googleapis/api v0.0.0-20240604185151-ef581f913117 // indirect
|
||||
google.golang.org/genproto/googleapis/rpc v0.0.0-20240903143218-8af14fe29dc1 // indirect
|
||||
gopkg.in/ini.v1 v1.67.0 // indirect
|
||||
lukechampine.com/blake3 v1.2.1 // indirect
|
||||
rsc.io/tmplfunc v0.0.3 // indirect
|
||||
|
|
BIN
go.sum
BIN
go.sum
Binary file not shown.
|
@ -27,21 +27,21 @@ type objectDesc struct {
|
|||
storageID []byte
|
||||
}
|
||||
|
||||
func TestAll(t *testing.T, cons Constructor, min, max uint64) {
|
||||
func TestAll(t *testing.T, cons Constructor, minSize, maxSize uint64) {
|
||||
t.Run("get", func(t *testing.T) {
|
||||
TestGet(t, cons, min, max)
|
||||
TestGet(t, cons, minSize, maxSize)
|
||||
})
|
||||
t.Run("get range", func(t *testing.T) {
|
||||
TestGetRange(t, cons, min, max)
|
||||
TestGetRange(t, cons, minSize, maxSize)
|
||||
})
|
||||
t.Run("delete", func(t *testing.T) {
|
||||
TestDelete(t, cons, min, max)
|
||||
TestDelete(t, cons, minSize, maxSize)
|
||||
})
|
||||
t.Run("exists", func(t *testing.T) {
|
||||
TestExists(t, cons, min, max)
|
||||
TestExists(t, cons, minSize, maxSize)
|
||||
})
|
||||
t.Run("iterate", func(t *testing.T) {
|
||||
TestIterate(t, cons, min, max)
|
||||
TestIterate(t, cons, minSize, maxSize)
|
||||
})
|
||||
}
|
||||
|
||||
|
@ -51,12 +51,12 @@ func TestInfo(t *testing.T, cons Constructor, expectedType string, expectedPath
|
|||
require.Equal(t, expectedPath, s.Path())
|
||||
}
|
||||
|
||||
func prepare(t *testing.T, count int, s common.Storage, min, max uint64) []objectDesc {
|
||||
func prepare(t *testing.T, count int, s common.Storage, minSize, maxSize uint64) []objectDesc {
|
||||
objects := make([]objectDesc, count)
|
||||
|
||||
r := mrand.New(mrand.NewSource(0))
|
||||
for i := range objects {
|
||||
objects[i].obj = NewObject(min + uint64(r.Intn(int(max-min+1)))) // not too large
|
||||
objects[i].obj = NewObject(minSize + uint64(r.Intn(int(maxSize-minSize+1)))) // not too large
|
||||
objects[i].addr = objectCore.AddressOf(objects[i].obj)
|
||||
|
||||
raw, err := objects[i].obj.Marshal()
|
||||
|
|
|
@ -13,12 +13,12 @@ import (
|
|||
|
||||
// TestControl checks correctness of a read-only mode.
|
||||
// cons must return a storage which is NOT opened.
|
||||
func TestControl(t *testing.T, cons Constructor, min, max uint64) {
|
||||
func TestControl(t *testing.T, cons Constructor, minSize, maxSize uint64) {
|
||||
s := cons(t)
|
||||
require.NoError(t, s.Open(mode.ComponentReadWrite))
|
||||
require.NoError(t, s.Init())
|
||||
|
||||
objects := prepare(t, 10, s, min, max)
|
||||
objects := prepare(t, 10, s, minSize, maxSize)
|
||||
require.NoError(t, s.Close())
|
||||
|
||||
require.NoError(t, s.Open(mode.ComponentReadOnly))
|
||||
|
@ -34,7 +34,7 @@ func TestControl(t *testing.T, cons Constructor, min, max uint64) {
|
|||
|
||||
t.Run("put fails", func(t *testing.T) {
|
||||
var prm common.PutPrm
|
||||
prm.Object = NewObject(min + uint64(rand.Intn(int(max-min+1))))
|
||||
prm.Object = NewObject(minSize + uint64(rand.Intn(int(maxSize-minSize+1))))
|
||||
prm.Address = objectCore.AddressOf(prm.Object)
|
||||
|
||||
_, err := s.Put(context.Background(), prm)
|
||||
|
|
|
@ -11,13 +11,13 @@ import (
|
|||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestDelete(t *testing.T, cons Constructor, min, max uint64) {
|
||||
func TestDelete(t *testing.T, cons Constructor, minSize, maxSize uint64) {
|
||||
s := cons(t)
|
||||
require.NoError(t, s.Open(mode.ComponentReadWrite))
|
||||
require.NoError(t, s.Init())
|
||||
defer func() { require.NoError(t, s.Close()) }()
|
||||
|
||||
objects := prepare(t, 4, s, min, max)
|
||||
objects := prepare(t, 4, s, minSize, maxSize)
|
||||
|
||||
t.Run("delete non-existent", func(t *testing.T) {
|
||||
var prm common.DeletePrm
|
||||
|
|
|
@ -10,13 +10,13 @@ import (
|
|||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestExists(t *testing.T, cons Constructor, min, max uint64) {
|
||||
func TestExists(t *testing.T, cons Constructor, minSize, maxSize uint64) {
|
||||
s := cons(t)
|
||||
require.NoError(t, s.Open(mode.ComponentReadWrite))
|
||||
require.NoError(t, s.Init())
|
||||
defer func() { require.NoError(t, s.Close()) }()
|
||||
|
||||
objects := prepare(t, 1, s, min, max)
|
||||
objects := prepare(t, 1, s, minSize, maxSize)
|
||||
|
||||
t.Run("missing object", func(t *testing.T) {
|
||||
prm := common.ExistsPrm{Address: oidtest.Address()}
|
||||
|
|
|
@ -11,13 +11,13 @@ import (
|
|||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestGet(t *testing.T, cons Constructor, min, max uint64) {
|
||||
func TestGet(t *testing.T, cons Constructor, minSize, maxSize uint64) {
|
||||
s := cons(t)
|
||||
require.NoError(t, s.Open(mode.ComponentReadWrite))
|
||||
require.NoError(t, s.Init())
|
||||
defer func() { require.NoError(t, s.Close()) }()
|
||||
|
||||
objects := prepare(t, 2, s, min, max)
|
||||
objects := prepare(t, 2, s, minSize, maxSize)
|
||||
|
||||
t.Run("missing object", func(t *testing.T) {
|
||||
gPrm := common.GetPrm{Address: oidtest.Address()}
|
||||
|
|
|
@ -13,13 +13,13 @@ import (
|
|||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestGetRange(t *testing.T, cons Constructor, min, max uint64) {
|
||||
func TestGetRange(t *testing.T, cons Constructor, minSize, maxSize uint64) {
|
||||
s := cons(t)
|
||||
require.NoError(t, s.Open(mode.ComponentReadWrite))
|
||||
require.NoError(t, s.Init())
|
||||
defer func() { require.NoError(t, s.Close()) }()
|
||||
|
||||
objects := prepare(t, 1, s, min, max)
|
||||
objects := prepare(t, 1, s, minSize, maxSize)
|
||||
|
||||
t.Run("missing object", func(t *testing.T) {
|
||||
gPrm := common.GetRangePrm{Address: oidtest.Address()}
|
||||
|
|
|
@ -10,13 +10,13 @@ import (
|
|||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestIterate(t *testing.T, cons Constructor, min, max uint64) {
|
||||
func TestIterate(t *testing.T, cons Constructor, minSize, maxSize uint64) {
|
||||
s := cons(t)
|
||||
require.NoError(t, s.Open(mode.ComponentReadWrite))
|
||||
require.NoError(t, s.Init())
|
||||
defer func() { require.NoError(t, s.Close()) }()
|
||||
|
||||
objects := prepare(t, 10, s, min, max)
|
||||
objects := prepare(t, 10, s, minSize, maxSize)
|
||||
|
||||
// Delete random object to ensure it is not iterated over.
|
||||
const delID = 2
|
||||
|
|
|
@ -236,7 +236,7 @@ func (db *DB) inhumeTx(tx *bbolt.Tx, epoch uint64, prm InhumePrm, res *InhumeRes
|
|||
return err
|
||||
}
|
||||
} else if errors.As(err, &ecErr) {
|
||||
err = db.inhumeECInfo(tx, epoch, prm.tomb, res, garbageBKT, graveyardBKT, ecErr.ECInfo(), cnr, bkt, value, targetKey)
|
||||
err = db.inhumeECInfo(tx, epoch, prm.tomb, res, garbageBKT, graveyardBKT, ecErr.ECInfo(), cnr, bkt, value)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -280,7 +280,7 @@ func (db *DB) inhumeTx(tx *bbolt.Tx, epoch uint64, prm InhumePrm, res *InhumeRes
|
|||
|
||||
func (db *DB) inhumeECInfo(tx *bbolt.Tx, epoch uint64, tomb *oid.Address, res *InhumeRes,
|
||||
garbageBKT *bbolt.Bucket, graveyardBKT *bbolt.Bucket,
|
||||
ecInfo *objectSDK.ECInfo, cnr cid.ID, targetBucket *bbolt.Bucket, value []byte, targetKey []byte,
|
||||
ecInfo *objectSDK.ECInfo, cnr cid.ID, targetBucket *bbolt.Bucket, value []byte,
|
||||
) error {
|
||||
for _, chunk := range ecInfo.Chunks {
|
||||
chunkBuf := make([]byte, addressKeySize)
|
||||
|
@ -296,11 +296,11 @@ func (db *DB) inhumeECInfo(tx *bbolt.Tx, epoch uint64, tomb *oid.Address, res *I
|
|||
if err != nil {
|
||||
return err
|
||||
}
|
||||
err = db.updateDeleteInfo(tx, garbageBKT, graveyardBKT, targetKey, cnr, chunkObj, res)
|
||||
chunkKey := addressKey(chunkAddr, chunkBuf)
|
||||
err = db.updateDeleteInfo(tx, garbageBKT, graveyardBKT, chunkKey, cnr, chunkObj, res)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
chunkKey := addressKey(chunkAddr, chunkBuf)
|
||||
if tomb != nil {
|
||||
_, err = db.markAsGC(graveyardBKT, garbageBKT, chunkKey)
|
||||
if err != nil {
|
||||
|
|
116
pkg/local_object_storage/metabase/inhume_ec_test.go
Normal file
116
pkg/local_object_storage/metabase/inhume_ec_test.go
Normal file
|
@ -0,0 +1,116 @@
|
|||
package meta
|
||||
|
||||
import (
|
||||
"context"
|
||||
"path/filepath"
|
||||
"testing"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
|
||||
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
||||
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
|
||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestInhumeECObject(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
db := New(
|
||||
WithPath(filepath.Join(t.TempDir(), "metabase")),
|
||||
WithPermissions(0o600),
|
||||
WithEpochState(epochState{uint64(12)}),
|
||||
)
|
||||
|
||||
require.NoError(t, db.Open(context.Background(), mode.ReadWrite))
|
||||
require.NoError(t, db.Init())
|
||||
defer func() { require.NoError(t, db.Close()) }()
|
||||
|
||||
cnr := cidtest.ID()
|
||||
ecChunk := oidtest.ID()
|
||||
ecChunk2 := oidtest.ID()
|
||||
ecParent := oidtest.ID()
|
||||
tombstoneID := oidtest.ID()
|
||||
|
||||
chunkObj := testutil.GenerateObjectWithCID(cnr)
|
||||
chunkObj.SetContainerID(cnr)
|
||||
chunkObj.SetID(ecChunk)
|
||||
chunkObj.SetPayload([]byte{0, 1, 2, 3, 4})
|
||||
chunkObj.SetPayloadSize(uint64(5))
|
||||
chunkObj.SetECHeader(objectSDK.NewECHeader(objectSDK.ECParentInfo{ID: ecParent}, 0, 3, []byte{}, 0))
|
||||
|
||||
chunkObj2 := testutil.GenerateObjectWithCID(cnr)
|
||||
chunkObj2.SetContainerID(cnr)
|
||||
chunkObj2.SetID(ecChunk2)
|
||||
chunkObj2.SetPayload([]byte{0, 1, 2, 3, 4, 5, 6, 7, 8, 9})
|
||||
chunkObj2.SetPayloadSize(uint64(10))
|
||||
chunkObj2.SetECHeader(objectSDK.NewECHeader(objectSDK.ECParentInfo{ID: ecParent}, 1, 3, []byte{}, 0))
|
||||
|
||||
// put object with EC
|
||||
|
||||
var prm PutPrm
|
||||
prm.SetObject(chunkObj)
|
||||
prm.SetStorageID([]byte("0/0"))
|
||||
_, err := db.Put(context.Background(), prm)
|
||||
require.NoError(t, err)
|
||||
|
||||
prm.SetObject(chunkObj2)
|
||||
_, err = db.Put(context.Background(), prm)
|
||||
require.NoError(t, err)
|
||||
|
||||
var ecChunkAddress oid.Address
|
||||
ecChunkAddress.SetContainer(cnr)
|
||||
ecChunkAddress.SetObject(ecChunk)
|
||||
|
||||
var ecParentAddress oid.Address
|
||||
ecParentAddress.SetContainer(cnr)
|
||||
ecParentAddress.SetObject(ecParent)
|
||||
|
||||
var chunkObjectAddress oid.Address
|
||||
chunkObjectAddress.SetContainer(cnr)
|
||||
chunkObjectAddress.SetObject(ecChunk)
|
||||
|
||||
var getPrm GetPrm
|
||||
|
||||
getPrm.SetAddress(ecChunkAddress)
|
||||
_, err = db.Get(context.Background(), getPrm)
|
||||
require.NoError(t, err)
|
||||
|
||||
var ecInfoError *objectSDK.ECInfoError
|
||||
getPrm.SetAddress(ecParentAddress)
|
||||
_, err = db.Get(context.Background(), getPrm)
|
||||
require.ErrorAs(t, err, &ecInfoError)
|
||||
require.True(t, len(ecInfoError.ECInfo().Chunks) == 2 &&
|
||||
ecInfoError.ECInfo().Chunks[0].Index == 0 &&
|
||||
ecInfoError.ECInfo().Chunks[0].Total == 3)
|
||||
|
||||
// inhume Chunk
|
||||
var inhumePrm InhumePrm
|
||||
var tombAddress oid.Address
|
||||
inhumePrm.SetAddresses(chunkObjectAddress)
|
||||
res, err := db.Inhume(context.Background(), inhumePrm)
|
||||
require.NoError(t, err)
|
||||
require.True(t, len(res.deletionDetails) == 1)
|
||||
require.True(t, res.deletionDetails[0].Size == 5)
|
||||
|
||||
// inhume EC parent (like Delete does)
|
||||
tombAddress.SetContainer(cnr)
|
||||
tombAddress.SetObject(tombstoneID)
|
||||
inhumePrm.SetAddresses(ecParentAddress)
|
||||
inhumePrm.SetTombstoneAddress(tombAddress)
|
||||
res, err = db.Inhume(context.Background(), inhumePrm)
|
||||
require.NoError(t, err)
|
||||
// Previously deleted chunk shouldn't be in the details, because it is marked as garbage
|
||||
require.True(t, len(res.deletionDetails) == 1)
|
||||
require.True(t, res.deletionDetails[0].Size == 10)
|
||||
|
||||
getPrm.SetAddress(ecParentAddress)
|
||||
_, err = db.Get(context.Background(), getPrm)
|
||||
require.ErrorAs(t, err, new(*apistatus.ObjectAlreadyRemoved))
|
||||
|
||||
getPrm.SetAddress(ecChunkAddress)
|
||||
_, err = db.Get(context.Background(), getPrm)
|
||||
require.ErrorAs(t, err, new(*apistatus.ObjectAlreadyRemoved))
|
||||
}
|
|
@ -1161,6 +1161,7 @@ func (t *boltForest) fillSortedChildren(b *bbolt.Bucket, nodeIDs MultiNode, h *f
|
|||
lastFilename = nil
|
||||
nodes = nil
|
||||
length = actualLength + 1
|
||||
count = 0
|
||||
c.Seek(append(prefix, byte(length), byte(length>>8)))
|
||||
c.Prev() // c.Next() will be performed by for loop
|
||||
}
|
||||
|
|
|
@ -1,11 +1,13 @@
|
|||
package pilorama
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"crypto/rand"
|
||||
"fmt"
|
||||
mrand "math/rand"
|
||||
"path/filepath"
|
||||
"slices"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
|
@ -232,6 +234,65 @@ func BenchmarkForestSortedIteration(b *testing.B) {
|
|||
}
|
||||
}
|
||||
|
||||
// The issue which we call "BugWithSkip" is easiest to understand when filenames are
|
||||
// monotonically increasing numbers. We want the list of sorted filenames to have different length interleaved.
|
||||
// The bug happens when we switch between length during listing.
|
||||
// Thus this test contains numbers from 1 to 2000 and batch size of size 10.
|
||||
func TestForest_TreeSortedIterationBugWithSkip(t *testing.T) {
|
||||
for i := range providers {
|
||||
t.Run(providers[i].name, func(t *testing.T) {
|
||||
testForestTreeSortedIterationBugWithSkip(t, providers[i].construct(t))
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func testForestTreeSortedIterationBugWithSkip(t *testing.T, s ForestStorage) {
|
||||
defer func() { require.NoError(t, s.Close()) }()
|
||||
|
||||
cid := cidtest.ID()
|
||||
d := CIDDescriptor{cid, 0, 1}
|
||||
treeID := "version"
|
||||
treeAdd := func(t *testing.T, ts int, filename string) {
|
||||
_, err := s.TreeMove(context.Background(), d, treeID, &Move{
|
||||
Child: RootID + uint64(ts),
|
||||
Parent: RootID,
|
||||
Meta: Meta{
|
||||
Time: Timestamp(ts),
|
||||
Items: []KeyValue{
|
||||
{Key: AttributeFilename, Value: []byte(filename)},
|
||||
},
|
||||
},
|
||||
})
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
const count = 2000
|
||||
treeAdd(t, 1, "")
|
||||
for i := 1; i < count; i++ {
|
||||
treeAdd(t, i+1, strconv.Itoa(i+1))
|
||||
}
|
||||
|
||||
var result []MultiNodeInfo
|
||||
treeAppend := func(t *testing.T, last *string, count int) *string {
|
||||
res, cursor, err := s.TreeSortedByFilename(context.Background(), d.CID, treeID, MultiNode{RootID}, last, count)
|
||||
require.NoError(t, err)
|
||||
result = append(result, res...)
|
||||
return cursor
|
||||
}
|
||||
|
||||
const batchSize = 10
|
||||
last := treeAppend(t, nil, batchSize)
|
||||
for i := 1; i < count/batchSize; i++ {
|
||||
last = treeAppend(t, last, batchSize)
|
||||
}
|
||||
require.Len(t, result, count)
|
||||
require.True(t, slices.IsSortedFunc(result, func(a, b MultiNodeInfo) int {
|
||||
filenameA := findAttr(a.Meta, AttributeFilename)
|
||||
filenameB := findAttr(b.Meta, AttributeFilename)
|
||||
return bytes.Compare(filenameA, filenameB)
|
||||
}))
|
||||
}
|
||||
|
||||
func TestForest_TreeSortedIteration(t *testing.T) {
|
||||
for i := range providers {
|
||||
t.Run(providers[i].name, func(t *testing.T) {
|
||||
|
|
|
@ -2,6 +2,8 @@ package pilorama
|
|||
|
||||
import (
|
||||
"container/heap"
|
||||
"slices"
|
||||
"strings"
|
||||
)
|
||||
|
||||
type heapInfo struct {
|
||||
|
@ -28,9 +30,10 @@ func (h *filenameHeap) Pop() any {
|
|||
|
||||
// fixedHeap maintains a fixed number of smallest elements started at some point.
|
||||
type fixedHeap struct {
|
||||
start *string
|
||||
count int
|
||||
h *filenameHeap
|
||||
start *string
|
||||
sorted bool
|
||||
count int
|
||||
h *filenameHeap
|
||||
}
|
||||
|
||||
func newHeap(start *string, count int) *fixedHeap {
|
||||
|
@ -44,20 +47,39 @@ func newHeap(start *string, count int) *fixedHeap {
|
|||
}
|
||||
}
|
||||
|
||||
const amortizationMultiplier = 5
|
||||
|
||||
func (h *fixedHeap) push(id MultiNode, filename string) bool {
|
||||
if h.start != nil && filename <= *h.start {
|
||||
return false
|
||||
}
|
||||
heap.Push(h.h, heapInfo{id: id, filename: filename})
|
||||
if h.h.Len() > h.count {
|
||||
heap.Remove(h.h, h.h.Len()-1)
|
||||
|
||||
*h.h = append(*h.h, heapInfo{id: id, filename: filename})
|
||||
h.sorted = false
|
||||
|
||||
if h.h.Len() > h.count*amortizationMultiplier {
|
||||
slices.SortFunc(*h.h, func(a, b heapInfo) int {
|
||||
return strings.Compare(a.filename, b.filename)
|
||||
})
|
||||
*h.h = (*h.h)[:h.count]
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
func (h *fixedHeap) pop() (heapInfo, bool) {
|
||||
if h.h.Len() != 0 {
|
||||
return heap.Pop(h.h).(heapInfo), true
|
||||
if !h.sorted {
|
||||
slices.SortFunc(*h.h, func(a, b heapInfo) int {
|
||||
return strings.Compare(a.filename, b.filename)
|
||||
})
|
||||
if len(*h.h) > h.count {
|
||||
*h.h = (*h.h)[:h.count]
|
||||
}
|
||||
h.sorted = true
|
||||
}
|
||||
if len(*h.h) != 0 {
|
||||
info := (*h.h)[0]
|
||||
*h.h = (*h.h)[1:]
|
||||
return info, true
|
||||
}
|
||||
return heapInfo{}, false
|
||||
}
|
||||
|
|
|
@ -641,8 +641,8 @@ func (c *Client) notaryTxValidationLimit() (uint32, error) {
|
|||
return 0, fmt.Errorf("can't get current blockchain height: %w", err)
|
||||
}
|
||||
|
||||
min := bc + c.notary.txValidTime
|
||||
rounded := (min/c.notary.roundTime + 1) * c.notary.roundTime
|
||||
minTime := bc + c.notary.txValidTime
|
||||
rounded := (minTime/c.notary.roundTime + 1) * c.notary.roundTime
|
||||
|
||||
return rounded, nil
|
||||
}
|
||||
|
|
|
@ -33,10 +33,15 @@ func (a *auditService) AddChain(ctx context.Context, req *apemanager.AddChainReq
|
|||
return res, err
|
||||
}
|
||||
|
||||
var respChainID []byte
|
||||
if respBody := res.GetBody(); respBody != nil {
|
||||
respChainID = respBody.GetChainID()
|
||||
}
|
||||
|
||||
audit.LogRequest(a.log, ape_grpc.APEManagerService_AddChain_FullMethodName, req,
|
||||
audit.TargetFromChainID(req.GetBody().GetTarget().GetTargetType().String(),
|
||||
req.GetBody().GetTarget().GetName(),
|
||||
res.GetBody().GetChainID()),
|
||||
respChainID),
|
||||
err == nil)
|
||||
|
||||
return res, err
|
||||
|
|
|
@ -2,9 +2,11 @@ package getsvc
|
|||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util"
|
||||
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
@ -120,6 +122,12 @@ func (exec *request) analyzeStatus(ctx context.Context, execCnr bool) {
|
|||
exec.log.Debug(logs.OperationFinishedWithError,
|
||||
zap.Error(exec.err),
|
||||
)
|
||||
var errAccessDenied *apistatus.ObjectAccessDenied
|
||||
if execCnr && errors.As(exec.err, &errAccessDenied) {
|
||||
// Local get can't return access denied error, so this error was returned by
|
||||
// write to the output stream. So there is no need to try to find object on other nodes.
|
||||
return
|
||||
}
|
||||
|
||||
if execCnr {
|
||||
exec.executeOnContainer(ctx)
|
||||
|
|
|
@ -31,6 +31,7 @@ func (r *request) processNode(ctx context.Context, info client.NodeInfo) bool {
|
|||
var errECInfo *objectSDK.ECInfoError
|
||||
var errRemoved *apistatus.ObjectAlreadyRemoved
|
||||
var errOutOfRange *apistatus.ObjectOutOfRange
|
||||
var errAccessDenied *apistatus.ObjectAccessDenied
|
||||
|
||||
switch {
|
||||
default:
|
||||
|
@ -38,7 +39,11 @@ func (r *request) processNode(ctx context.Context, info client.NodeInfo) bool {
|
|||
if r.status != statusEC {
|
||||
// for raw requests, continue to collect other parts
|
||||
r.status = statusUndefined
|
||||
r.err = new(apistatus.ObjectNotFound)
|
||||
if errors.As(err, &errAccessDenied) {
|
||||
r.err = err
|
||||
} else {
|
||||
r.err = new(apistatus.ObjectNotFound)
|
||||
}
|
||||
}
|
||||
return false
|
||||
case err == nil:
|
||||
|
|
|
@ -23,12 +23,14 @@ import (
|
|||
)
|
||||
|
||||
type getRequestForwarder struct {
|
||||
OnceResign sync.Once
|
||||
OnceHeaderSending sync.Once
|
||||
GlobalProgress int
|
||||
Key *ecdsa.PrivateKey
|
||||
Request *objectV2.GetRequest
|
||||
Stream *streamObjectWriter
|
||||
OnceResign sync.Once
|
||||
GlobalProgress int
|
||||
Key *ecdsa.PrivateKey
|
||||
Request *objectV2.GetRequest
|
||||
Stream *streamObjectWriter
|
||||
|
||||
headerSent bool
|
||||
headerSentGuard sync.Mutex
|
||||
}
|
||||
|
||||
func (f *getRequestForwarder) forwardRequestToNode(ctx context.Context, addr network.Address, c client.MultiAddressClient, pubkey []byte) (*objectSDK.Object, error) {
|
||||
|
@ -83,13 +85,15 @@ func (f *getRequestForwarder) writeHeader(ctx context.Context, v *objectV2.GetOb
|
|||
obj.SetSignature(v.GetSignature())
|
||||
obj.SetHeader(v.GetHeader())
|
||||
|
||||
var err error
|
||||
f.OnceHeaderSending.Do(func() {
|
||||
err = f.Stream.WriteHeader(ctx, objectSDK.NewFromV2(obj))
|
||||
})
|
||||
if err != nil {
|
||||
f.headerSentGuard.Lock()
|
||||
defer f.headerSentGuard.Unlock()
|
||||
if f.headerSent {
|
||||
return nil
|
||||
}
|
||||
if err := f.Stream.WriteHeader(ctx, objectSDK.NewFromV2(obj)); err != nil {
|
||||
return errCouldNotWriteObjHeader(err)
|
||||
}
|
||||
f.headerSent = true
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
|
@ -24,6 +24,8 @@ type distributedTarget struct {
|
|||
nodeTargetInitializer func(nodeDesc) preparedObjectTarget
|
||||
|
||||
relay func(context.Context, nodeDesc) error
|
||||
|
||||
resetSuccessAfterOnBroadcast bool
|
||||
}
|
||||
|
||||
// parameters and state of container traversal.
|
||||
|
@ -35,6 +37,8 @@ type traversal struct {
|
|||
|
||||
// container nodes which was processed during the primary object placement
|
||||
mExclude map[string]*bool
|
||||
|
||||
resetSuccessAfterOnBroadcast bool
|
||||
}
|
||||
|
||||
// updates traversal parameters after the primary placement finish and
|
||||
|
@ -44,6 +48,10 @@ func (x *traversal) submitPrimaryPlacementFinish() bool {
|
|||
// do not track success during container broadcast (best-effort)
|
||||
x.opts = append(x.opts, placement.WithoutSuccessTracking())
|
||||
|
||||
if x.resetSuccessAfterOnBroadcast {
|
||||
x.opts = append(x.opts, placement.ResetSuccessAfter())
|
||||
}
|
||||
|
||||
// avoid 2nd broadcast
|
||||
x.extraBroadcastEnabled = false
|
||||
|
||||
|
@ -118,5 +126,6 @@ func (t *distributedTarget) iteratePlacement(ctx context.Context) error {
|
|||
|
||||
iter := t.cfg.newNodeIterator(append(t.placementOpts, placement.ForObject(id)))
|
||||
iter.extraBroadcastEnabled = needAdditionalBroadcast(t.obj, false /* Distributed target is for cluster-wide PUT */)
|
||||
iter.resetSuccessAfterOnBroadcast = t.resetSuccessAfterOnBroadcast
|
||||
return iter.forEachNode(ctx, t.sendObject)
|
||||
}
|
||||
|
|
|
@ -197,14 +197,15 @@ func (e *ecWriter) writeRawObject(ctx context.Context, obj *objectSDK.Object) er
|
|||
if err != nil {
|
||||
return err
|
||||
}
|
||||
partsProcessed := make([]atomic.Bool, len(parts))
|
||||
objID, _ := obj.ID()
|
||||
t, err := placement.NewTraverser(append(e.placementOpts, placement.ForObject(objID))...)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
eg, egCtx := errgroup.WithContext(ctx)
|
||||
for {
|
||||
eg, egCtx := errgroup.WithContext(ctx)
|
||||
nodes := t.Next()
|
||||
if len(nodes) == 0 {
|
||||
break
|
||||
|
@ -216,14 +217,20 @@ func (e *ecWriter) writeRawObject(ctx context.Context, obj *objectSDK.Object) er
|
|||
}
|
||||
|
||||
for idx := range parts {
|
||||
idx := idx
|
||||
eg.Go(func() error {
|
||||
return e.writePart(egCtx, parts[idx], idx, nodes, visited)
|
||||
})
|
||||
t.SubmitSuccess()
|
||||
if !partsProcessed[idx].Load() {
|
||||
eg.Go(func() error {
|
||||
err := e.writePart(egCtx, parts[idx], idx, nodes, visited)
|
||||
if err == nil {
|
||||
partsProcessed[idx].Store(true)
|
||||
t.SubmitSuccess()
|
||||
}
|
||||
return err
|
||||
})
|
||||
}
|
||||
}
|
||||
err = eg.Wait()
|
||||
}
|
||||
if err := eg.Wait(); err != nil {
|
||||
if err != nil {
|
||||
return errIncompletePut{
|
||||
singleErr: err,
|
||||
}
|
||||
|
|
191
pkg/services/object/put/ec_test.go
Normal file
191
pkg/services/object/put/ec_test.go
Normal file
|
@ -0,0 +1,191 @@
|
|||
package putsvc
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"crypto/rand"
|
||||
"crypto/sha256"
|
||||
"errors"
|
||||
"fmt"
|
||||
"strconv"
|
||||
"testing"
|
||||
|
||||
rawclient "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/rpc/client"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
|
||||
netmapcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/checksum"
|
||||
apiclient "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
|
||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
|
||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test"
|
||||
usertest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user/test"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/version"
|
||||
"git.frostfs.info/TrueCloudLab/tzhash/tz"
|
||||
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
|
||||
"github.com/panjf2000/ants/v2"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
type testPlacementBuilder struct {
|
||||
vectors [][]netmap.NodeInfo
|
||||
}
|
||||
|
||||
func (p *testPlacementBuilder) BuildPlacement(_ cid.ID, _ *oid.ID, _ netmap.PlacementPolicy) (
|
||||
[][]netmap.NodeInfo, error,
|
||||
) {
|
||||
arr := make([]netmap.NodeInfo, len(p.vectors[0]))
|
||||
copy(arr, p.vectors[0])
|
||||
return [][]netmap.NodeInfo{arr}, nil
|
||||
}
|
||||
|
||||
type nmKeys struct{}
|
||||
|
||||
func (nmKeys) IsLocalKey(_ []byte) bool {
|
||||
return false
|
||||
}
|
||||
|
||||
type clientConstructor struct {
|
||||
vectors [][]netmap.NodeInfo
|
||||
}
|
||||
|
||||
func (c clientConstructor) Get(info client.NodeInfo) (client.MultiAddressClient, error) {
|
||||
if bytes.Equal(info.PublicKey(), c.vectors[0][0].PublicKey()) ||
|
||||
bytes.Equal(info.PublicKey(), c.vectors[0][1].PublicKey()) {
|
||||
return multiAddressClient{err: errors.New("node unavailable")}, nil
|
||||
}
|
||||
return multiAddressClient{}, nil
|
||||
}
|
||||
|
||||
type multiAddressClient struct {
|
||||
client.MultiAddressClient
|
||||
err error
|
||||
}
|
||||
|
||||
func (c multiAddressClient) ObjectPutSingle(_ context.Context, _ apiclient.PrmObjectPutSingle) (*apiclient.ResObjectPutSingle, error) {
|
||||
if c.err != nil {
|
||||
return nil, c.err
|
||||
}
|
||||
return &apiclient.ResObjectPutSingle{}, nil
|
||||
}
|
||||
|
||||
func (c multiAddressClient) ReportError(error) {
|
||||
}
|
||||
|
||||
func (multiAddressClient) RawForAddress(context.Context, network.Address, func(cli *rawclient.Client) error) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func TestECWriter(t *testing.T) {
|
||||
// Create container with policy EC 1.1
|
||||
cnr := container.Container{}
|
||||
p1 := netmap.PlacementPolicy{}
|
||||
p1.SetContainerBackupFactor(1)
|
||||
x1 := netmap.ReplicaDescriptor{}
|
||||
x1.SetECDataCount(1)
|
||||
x1.SetECParityCount(1)
|
||||
p1.AddReplicas(x1)
|
||||
cnr.SetPlacementPolicy(p1)
|
||||
cnr.SetAttribute("cnr", "cnr1")
|
||||
|
||||
cid := cidtest.ID()
|
||||
|
||||
// Create 4 nodes, 2 nodes for chunks,
|
||||
// 2 nodes for the case when the first two will fail.
|
||||
ns, _ := testNodeMatrix(t, []int{4})
|
||||
|
||||
data := make([]byte, 100)
|
||||
_, _ = rand.Read(data)
|
||||
ver := version.Current()
|
||||
|
||||
var csum checksum.Checksum
|
||||
csum.SetSHA256(sha256.Sum256(data))
|
||||
|
||||
var csumTZ checksum.Checksum
|
||||
csumTZ.SetTillichZemor(tz.Sum(csum.Value()))
|
||||
|
||||
obj := objectSDK.New()
|
||||
obj.SetID(oidtest.ID())
|
||||
obj.SetOwnerID(usertest.ID())
|
||||
obj.SetContainerID(cid)
|
||||
obj.SetVersion(&ver)
|
||||
obj.SetPayload(data)
|
||||
obj.SetPayloadSize(uint64(len(data)))
|
||||
obj.SetPayloadChecksum(csum)
|
||||
obj.SetPayloadHomomorphicHash(csumTZ)
|
||||
|
||||
// Builder return nodes without sort by hrw
|
||||
builder := &testPlacementBuilder{
|
||||
vectors: ns,
|
||||
}
|
||||
|
||||
ownerKey, err := keys.NewPrivateKey()
|
||||
require.NoError(t, err)
|
||||
|
||||
pool, err := ants.NewPool(4, ants.WithNonblocking(true))
|
||||
require.NoError(t, err)
|
||||
|
||||
log, err := logger.NewLogger(nil)
|
||||
require.NoError(t, err)
|
||||
|
||||
var n nmKeys
|
||||
ecw := ecWriter{
|
||||
cfg: &cfg{
|
||||
netmapKeys: n,
|
||||
remotePool: pool,
|
||||
log: log,
|
||||
clientConstructor: clientConstructor{vectors: ns},
|
||||
},
|
||||
placementOpts: append(
|
||||
[]placement.Option{placement.UseBuilder(builder), placement.ForContainer(cnr)},
|
||||
placement.WithCopyNumbers(nil)), // copies number ignored for EC
|
||||
container: cnr,
|
||||
key: &ownerKey.PrivateKey,
|
||||
relay: nil,
|
||||
objMetaValid: true,
|
||||
}
|
||||
|
||||
err = ecw.WriteObject(context.Background(), obj)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
func testNodeMatrix(t testing.TB, dim []int) ([][]netmap.NodeInfo, [][]string) {
|
||||
mNodes := make([][]netmap.NodeInfo, len(dim))
|
||||
mAddr := make([][]string, len(dim))
|
||||
|
||||
for i := range dim {
|
||||
ns := make([]netmap.NodeInfo, dim[i])
|
||||
as := make([]string, dim[i])
|
||||
|
||||
for j := range dim[i] {
|
||||
a := fmt.Sprintf("/ip4/192.168.0.%s/tcp/%s",
|
||||
strconv.Itoa(i),
|
||||
strconv.Itoa(60000+j),
|
||||
)
|
||||
|
||||
var ni netmap.NodeInfo
|
||||
ni.SetNetworkEndpoints(a)
|
||||
ni.SetPublicKey([]byte(a))
|
||||
|
||||
var na network.AddressGroup
|
||||
|
||||
err := na.FromIterator(netmapcore.Node(ni))
|
||||
require.NoError(t, err)
|
||||
|
||||
as[j] = network.StringifyGroup(na)
|
||||
|
||||
ns[j] = ni
|
||||
}
|
||||
|
||||
mNodes[i] = ns
|
||||
mAddr[i] = as
|
||||
}
|
||||
|
||||
return mNodes, mAddr
|
||||
}
|
|
@ -166,6 +166,7 @@ func (s *Service) saveToNodes(ctx context.Context, obj *objectSDK.Object, req *o
|
|||
func (s *Service) saveToREPReplicas(ctx context.Context, placement putSinglePlacement, obj *objectSDK.Object, localOnly bool, req *objectAPI.PutSingleRequest, meta object.ContentMeta) error {
|
||||
iter := s.cfg.newNodeIterator(placement.placementOptions)
|
||||
iter.extraBroadcastEnabled = needAdditionalBroadcast(obj, localOnly)
|
||||
iter.resetSuccessAfterOnBroadcast = placement.resetSuccessAfterOnBroadcast
|
||||
|
||||
signer := &putSingleRequestSigner{
|
||||
req: req,
|
||||
|
@ -209,9 +210,10 @@ func (s *Service) saveToECReplicas(ctx context.Context, placement putSinglePlace
|
|||
}
|
||||
|
||||
type putSinglePlacement struct {
|
||||
placementOptions []placement.Option
|
||||
isEC bool
|
||||
container containerSDK.Container
|
||||
placementOptions []placement.Option
|
||||
isEC bool
|
||||
container containerSDK.Container
|
||||
resetSuccessAfterOnBroadcast bool
|
||||
}
|
||||
|
||||
func (s *Service) getPutSinglePlacementOptions(obj *objectSDK.Object, copiesNumber []uint32, localOnly bool) (putSinglePlacement, error) {
|
||||
|
@ -232,6 +234,7 @@ func (s *Service) getPutSinglePlacementOptions(obj *objectSDK.Object, copiesNumb
|
|||
}
|
||||
if container.IsECContainer(cnrInfo.Value) && !object.IsECSupported(obj) && !localOnly {
|
||||
result.placementOptions = append(result.placementOptions, placement.SuccessAfter(uint32(policy.ECParityCount(cnrInfo.Value.PlacementPolicy())+1)))
|
||||
result.resetSuccessAfterOnBroadcast = true
|
||||
}
|
||||
result.placementOptions = append(result.placementOptions, placement.ForContainer(cnrInfo.Value))
|
||||
|
||||
|
|
|
@ -233,16 +233,19 @@ func (p *Streamer) newDefaultObjectWriter(prm *PutInitPrm, forECPlacement bool)
|
|||
}
|
||||
}
|
||||
|
||||
var resetSuccessAfterOnBroadcast bool
|
||||
traverseOpts := prm.traverseOpts
|
||||
if forECPlacement && !prm.common.LocalOnly() {
|
||||
// save non-regular and linking object to EC container.
|
||||
// EC 2.1 -> REP 2, EC 2.2 -> REP 3 etc.
|
||||
traverseOpts = append(traverseOpts, placement.SuccessAfter(uint32(policy.ECParityCount(prm.cnr.PlacementPolicy())+1)))
|
||||
resetSuccessAfterOnBroadcast = true
|
||||
}
|
||||
|
||||
return &distributedTarget{
|
||||
cfg: p.cfg,
|
||||
placementOpts: traverseOpts,
|
||||
cfg: p.cfg,
|
||||
placementOpts: traverseOpts,
|
||||
resetSuccessAfterOnBroadcast: resetSuccessAfterOnBroadcast,
|
||||
nodeTargetInitializer: func(node nodeDesc) preparedObjectTarget {
|
||||
if node.local {
|
||||
return localTarget{
|
||||
|
|
|
@ -303,6 +303,13 @@ func SuccessAfter(v uint32) Option {
|
|||
}
|
||||
}
|
||||
|
||||
// ResetSuccessAfter resets flat success number setting option.
|
||||
func ResetSuccessAfter() Option {
|
||||
return func(c *cfg) {
|
||||
c.flatSuccess = nil
|
||||
}
|
||||
}
|
||||
|
||||
// WithoutSuccessTracking disables success tracking in traversal.
|
||||
func WithoutSuccessTracking() Option {
|
||||
return func(c *cfg) {
|
||||
|
|
|
@ -138,7 +138,7 @@ func (s *Service) checkAPE(ctx context.Context, bt *bearer.Token,
|
|||
|
||||
request, err := s.newAPERequest(ctx, namespace, cid, operation, role, publicKey)
|
||||
if err != nil {
|
||||
return apeErr(err)
|
||||
return fmt.Errorf("failed to create ape request: %w", err)
|
||||
}
|
||||
|
||||
var cr engine.ChainRouter
|
||||
|
@ -167,7 +167,7 @@ func (s *Service) checkAPE(ctx context.Context, bt *bearer.Token,
|
|||
rt := engine.NewRequestTargetExtended(namespace, cid.EncodeToString(), fmt.Sprintf("%s:%s", namespace, publicKey.Address()), groups)
|
||||
status, found, err := cr.IsAllowed(apechain.Ingress, rt, request)
|
||||
if err != nil {
|
||||
return apeErr(err)
|
||||
return err
|
||||
}
|
||||
if found && status == apechain.Allow {
|
||||
return nil
|
||||
|
|
1
staticcheck.conf
Normal file
1
staticcheck.conf
Normal file
|
@ -0,0 +1 @@
|
|||
checks = ["inherit", "-SA1019"]
|
Loading…
Reference in a new issue