Compare commits

...

10 commits

Author SHA1 Message Date
990e984023 [#1427] object: Fix Put for EC object when node unavailable
There might be situation when context canceled earlier than traverser move to another part of the nodes.
To avoid this, need to wait for the result from concurrent put at each traverser iteration.

Signed-off-by: Anton Nikiforov <an.nikiforov@yadro.com>
2024-11-12 12:55:31 +03:00
a83dd231ec [#1358] go.mod: Update api-go, sdk-go
Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2024-09-06 17:05:40 +03:00
51f0bf8be0 [#1316] go.mod: Bump go version to 1.22
Signed-off-by: Ekaterina Lebedeva <ekaterina.lebedeva@yadro.com>
Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2024-09-06 17:05:40 +03:00
52c04ef475 [#1341] Makefile: Build linter with -trimpath
Fix error with go1.23:
```
Error: build linters: unable to load custom analyzer "truecloudlab-linters": ../linters/bin/external_linters.so, plugin.Open("/repo/frostfs/linters/bin/external_linters"): plugin was built with a different version of package cmp
Failed executing command with error: build linters: unable to load custom analyzer "truecloudlab-linters": ../linters/bin/external_linters.so, plugin.Open("/repo/frostfs/linters/bin/external_linters"): plugin was built with a different version of package cmp
```

Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2024-09-06 17:05:40 +03:00
8423e0f6f9 [#1316] lint: Fix warnings
Renamed parameters `min/max` to avoid conflicts with
predeclared identifiers.

Replaced background context with parent context without
cancellation in closer functions in frostfs-node.

Signed-off-by: Ekaterina Lebedeva <ekaterina.lebedeva@yadro.com>
Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2024-09-06 17:05:40 +03:00
08593f664b [#1329] putSvc: Reset SuccessAfter for non-EC objects in EC container broadcasting
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
2024-09-05 16:11:01 +03:00
eec359cfa8 [#1351] apemanager: Fix AddChain handler for audit middleware
* `GetChainID` from `frostfs-api-go/v2` does not handler nil-pointer
  response body that leads to the panic after its call.

Signed-off-by: Airat Arifullin <a.arifullin@yadro.com>
2024-09-04 15:57:34 +03:00
43d6fbf73b [#1339] getSvc: Fix access denied error handling
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
2024-08-28 14:00:57 +03:00
01e18eda43 [#1328] pilorama: Do not skip items in SortedByFilename
Benchmark results:
```
goos: linux
goarch: amd64
pkg: git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/pilorama
cpu: 11th Gen Intel(R) Core(TM) i5-1135G7 @ 2.40GHz
                                   │     old     │                 new                 │
                                   │   sec/op    │   sec/op     vs base                │
ForestSortedIteration/bbolt,root-8   207.2µ ± 6%   173.6µ ± 6%  -16.23% (p=0.000 n=10)
ForestSortedIteration/bbolt,leaf-8   3.910µ ± 5%   3.928µ ± 7%        ~ (p=0.529 n=10)
geomean                              28.46µ        26.11µ        -8.27%
```

They are not representative, as the worst case is when we have multiple
items of different lengths. However, `FileName` is usually less than 100
in practice, so the asymptotics is the same.

Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2024-08-23 13:41:01 +03:00
3e89e744aa [#1328] pilorama: Add tricky test for SortedByFilename
Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2024-08-23 13:23:15 +03:00
33 changed files with 418 additions and 90 deletions

View file

@ -8,7 +8,7 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
go_versions: [ '1.21', '1.22' ]
go_versions: [ '1.22', '1.23' ]
steps:
- uses: actions/checkout@v3

View file

@ -16,7 +16,7 @@ jobs:
- name: Set up Go
uses: actions/setup-go@v3
with:
go-version: 1.22
go-version: 1.23
- name: Set up Python
run: |
apt update

View file

@ -11,7 +11,7 @@ jobs:
- name: Set up Go
uses: actions/setup-go@v3
with:
go-version: '1.22'
go-version: '1.23'
cache: true
- name: Install linters
@ -25,7 +25,7 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
go_versions: [ '1.21', '1.22' ]
go_versions: [ '1.22', '1.23' ]
fail-fast: false
steps:
- uses: actions/checkout@v3
@ -48,7 +48,7 @@ jobs:
- name: Set up Go
uses: actions/setup-go@v3
with:
go-version: '1.21'
go-version: '1.22'
cache: true
- name: Run tests
@ -63,7 +63,7 @@ jobs:
- name: Set up Go
uses: actions/setup-go@v3
with:
go-version: '1.22'
go-version: '1.23'
cache: true
- name: Install staticcheck
@ -81,7 +81,7 @@ jobs:
- name: Set up Go
uses: actions/setup-go@v3
with:
go-version: '1.21'
go-version: '1.22'
cache: true
- name: Install gopls

View file

@ -13,7 +13,7 @@ jobs:
- name: Setup Go
uses: actions/setup-go@v3
with:
go-version: '1.22'
go-version: '1.23'
- name: Install govulncheck
run: go install golang.org/x/vuln/cmd/govulncheck@latest

View file

@ -12,7 +12,8 @@ run:
# output configuration options
output:
# colored-line-number|line-number|json|tab|checkstyle|code-climate, default is "colored-line-number"
format: tab
formats:
- format: tab
# all available settings of specific linters
linters-settings:

View file

@ -8,8 +8,8 @@ HUB_IMAGE ?= truecloudlab/frostfs
HUB_TAG ?= "$(shell echo ${VERSION} | sed 's/^v//')"
GO_VERSION ?= 1.22
LINT_VERSION ?= 1.56.1
TRUECLOUDLAB_LINT_VERSION ?= 0.0.5
LINT_VERSION ?= 1.60.1
TRUECLOUDLAB_LINT_VERSION ?= 0.0.7
PROTOC_VERSION ?= 25.0
PROTOC_GEN_GO_VERSION ?= $(shell go list -f '{{.Version}}' -m google.golang.org/protobuf)
PROTOGEN_FROSTFS_VERSION ?= $(shell go list -f '{{.Version}}' -m git.frostfs.info/TrueCloudLab/frostfs-api-go/v2)
@ -17,7 +17,7 @@ PROTOC_OS_VERSION=osx-x86_64
ifeq ($(shell uname), Linux)
PROTOC_OS_VERSION=linux-x86_64
endif
STATICCHECK_VERSION ?= 2023.1.6
STATICCHECK_VERSION ?= 2024.1.1
ARCH = amd64
BIN = bin
@ -190,7 +190,7 @@ lint-install:
@@make -C $(TMP_DIR)/linters lib CGO_ENABLED=1 OUT_DIR=$(OUTPUT_LINT_DIR)
@rm -rf $(TMP_DIR)/linters
@rmdir $(TMP_DIR) 2>/dev/null || true
@CGO_ENABLED=1 GOBIN=$(LINT_DIR) go install github.com/golangci/golangci-lint/cmd/golangci-lint@v$(LINT_VERSION)
@CGO_ENABLED=1 GOBIN=$(LINT_DIR) go install -trimpath github.com/golangci/golangci-lint/cmd/golangci-lint@v$(LINT_VERSION)
# Run linters
lint:
@ -210,7 +210,7 @@ staticcheck-run:
@if [ ! -d "$(STATICCHECK_VERSION_DIR)" ]; then \
make staticcheck-install; \
fi
@$(STATICCHECK_VERSION_DIR)/staticcheck ./...
@$(STATICCHECK_VERSION_DIR)/staticcheck -checks inherit,-SA1019 ./...
# Install gopls
gopls-install:

View file

@ -49,7 +49,7 @@ The latest version of frostfs-node works with frostfs-contract
# Building
To make all binaries you need Go 1.21+ and `make`:
To make all binaries you need Go 1.22+ and `make`:
```
make all
```

View file

@ -1070,7 +1070,7 @@ func initLocalStorage(ctx context.Context, c *cfg) {
c.onShutdown(func() {
c.log.Info(logs.FrostFSNodeClosingComponentsOfTheStorageEngine)
err := ls.Close(context.Background())
err := ls.Close(context.WithoutCancel(ctx))
if err != nil {
c.log.Info(logs.FrostFSNodeStorageEngineClosingFailure,
zap.String("error", err.Error()),

View file

@ -21,7 +21,7 @@ func initTracing(ctx context.Context, c *cfg) {
c.closers = append(c.closers, closer{
name: "tracing",
fn: func() {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
ctx, cancel := context.WithTimeout(context.WithoutCancel(ctx), time.Second*5)
defer cancel()
err := tracing.Shutdown(ctx) // cfg context cancels before close
if err != nil {

34
go.mod
View file

@ -1,14 +1,14 @@
module git.frostfs.info/TrueCloudLab/frostfs-node
go 1.21
go 1.22
require (
code.gitea.io/sdk/gitea v0.17.1
git.frostfs.info/TrueCloudLab/frostfs-api-go/v2 v2.16.1-0.20240726072425-3dfa2f4fd65e
git.frostfs.info/TrueCloudLab/frostfs-api-go/v2 v2.16.1-0.20240906121927-2c79f770e449
git.frostfs.info/TrueCloudLab/frostfs-contract v0.19.3-0.20240621131249-49e5270f673e
git.frostfs.info/TrueCloudLab/frostfs-crypto v0.6.0
git.frostfs.info/TrueCloudLab/frostfs-observability v0.0.0-20231101111734-b3ad3335ff65
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20240726111349-9da46f566fec
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20240906135451-a7aabe53491c
git.frostfs.info/TrueCloudLab/hrw v1.2.1
git.frostfs.info/TrueCloudLab/policy-engine v0.0.0-20240712081403-2628f6184984
git.frostfs.info/TrueCloudLab/tzhash v1.8.0
@ -23,7 +23,7 @@ require (
github.com/mitchellh/go-homedir v1.1.0
github.com/mr-tron/base58 v1.2.0
github.com/multiformats/go-multiaddr v0.12.1
github.com/nspcc-dev/neo-go v0.106.2
github.com/nspcc-dev/neo-go v0.106.3
github.com/olekukonko/tablewriter v0.0.5
github.com/panjf2000/ants/v2 v2.9.0
github.com/paulmach/orb v0.11.0
@ -39,11 +39,11 @@ require (
go.opentelemetry.io/otel/trace v1.22.0
go.uber.org/zap v1.27.0
golang.org/x/exp v0.0.0-20240222234643-814bf88cf225
golang.org/x/sync v0.6.0
golang.org/x/sys v0.18.0
golang.org/x/term v0.18.0
google.golang.org/grpc v1.63.2
google.golang.org/protobuf v1.33.0
golang.org/x/sync v0.8.0
golang.org/x/sys v0.25.0
golang.org/x/term v0.24.0
google.golang.org/grpc v1.66.0
google.golang.org/protobuf v1.34.2
gopkg.in/yaml.v3 v3.0.1
)
@ -60,10 +60,10 @@ require (
github.com/beorn7/perks v1.0.1 // indirect
github.com/bits-and-blooms/bitset v1.13.0 // indirect
github.com/cenkalti/backoff/v4 v4.2.1 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/consensys/bavard v0.1.13 // indirect
github.com/consensys/gnark-crypto v0.12.2-0.20231222162921-eb75782795d2 // indirect
github.com/cpuguy83/go-md2man/v2 v2.0.3 // indirect
github.com/cpuguy83/go-md2man/v2 v2.0.4 // indirect
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/davidmz/go-pageant v1.0.2 // indirect
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.2.0 // indirect
@ -96,7 +96,7 @@ require (
github.com/multiformats/go-multihash v0.2.3 // indirect
github.com/multiformats/go-varint v0.0.7 // indirect
github.com/nspcc-dev/go-ordered-json v0.0.0-20240301084351-0246b013f8b2 // indirect
github.com/nspcc-dev/neo-go/pkg/interop v0.0.0-20240521091047-78685785716d // indirect
github.com/nspcc-dev/neo-go/pkg/interop v0.0.0-20240727093519-1a48f1ce43ec // indirect
github.com/nspcc-dev/rfc6979 v0.2.1 // indirect
github.com/pelletier/go-toml/v2 v2.1.1 // indirect
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
@ -118,11 +118,11 @@ require (
go.opentelemetry.io/otel/sdk v1.22.0 // indirect
go.opentelemetry.io/proto/otlp v1.1.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/crypto v0.21.0 // indirect
golang.org/x/net v0.23.0 // indirect
golang.org/x/text v0.14.0 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20240227224415-6ceb2ff114de // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240227224415-6ceb2ff114de // indirect
golang.org/x/crypto v0.27.0 // indirect
golang.org/x/net v0.29.0 // indirect
golang.org/x/text v0.18.0 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20240604185151-ef581f913117 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240903143218-8af14fe29dc1 // indirect
gopkg.in/ini.v1 v1.67.0 // indirect
lukechampine.com/blake3 v1.2.1 // indirect
rsc.io/tmplfunc v0.0.3 // indirect

BIN
go.sum

Binary file not shown.

View file

@ -27,21 +27,21 @@ type objectDesc struct {
storageID []byte
}
func TestAll(t *testing.T, cons Constructor, min, max uint64) {
func TestAll(t *testing.T, cons Constructor, minSize, maxSize uint64) {
t.Run("get", func(t *testing.T) {
TestGet(t, cons, min, max)
TestGet(t, cons, minSize, maxSize)
})
t.Run("get range", func(t *testing.T) {
TestGetRange(t, cons, min, max)
TestGetRange(t, cons, minSize, maxSize)
})
t.Run("delete", func(t *testing.T) {
TestDelete(t, cons, min, max)
TestDelete(t, cons, minSize, maxSize)
})
t.Run("exists", func(t *testing.T) {
TestExists(t, cons, min, max)
TestExists(t, cons, minSize, maxSize)
})
t.Run("iterate", func(t *testing.T) {
TestIterate(t, cons, min, max)
TestIterate(t, cons, minSize, maxSize)
})
}
@ -51,12 +51,12 @@ func TestInfo(t *testing.T, cons Constructor, expectedType string, expectedPath
require.Equal(t, expectedPath, s.Path())
}
func prepare(t *testing.T, count int, s common.Storage, min, max uint64) []objectDesc {
func prepare(t *testing.T, count int, s common.Storage, minSize, maxSize uint64) []objectDesc {
objects := make([]objectDesc, count)
r := mrand.New(mrand.NewSource(0))
for i := range objects {
objects[i].obj = NewObject(min + uint64(r.Intn(int(max-min+1)))) // not too large
objects[i].obj = NewObject(minSize + uint64(r.Intn(int(maxSize-minSize+1)))) // not too large
objects[i].addr = objectCore.AddressOf(objects[i].obj)
raw, err := objects[i].obj.Marshal()

View file

@ -13,12 +13,12 @@ import (
// TestControl checks correctness of a read-only mode.
// cons must return a storage which is NOT opened.
func TestControl(t *testing.T, cons Constructor, min, max uint64) {
func TestControl(t *testing.T, cons Constructor, minSize, maxSize uint64) {
s := cons(t)
require.NoError(t, s.Open(mode.ComponentReadWrite))
require.NoError(t, s.Init())
objects := prepare(t, 10, s, min, max)
objects := prepare(t, 10, s, minSize, maxSize)
require.NoError(t, s.Close())
require.NoError(t, s.Open(mode.ComponentReadOnly))
@ -34,7 +34,7 @@ func TestControl(t *testing.T, cons Constructor, min, max uint64) {
t.Run("put fails", func(t *testing.T) {
var prm common.PutPrm
prm.Object = NewObject(min + uint64(rand.Intn(int(max-min+1))))
prm.Object = NewObject(minSize + uint64(rand.Intn(int(maxSize-minSize+1))))
prm.Address = objectCore.AddressOf(prm.Object)
_, err := s.Put(context.Background(), prm)

View file

@ -11,13 +11,13 @@ import (
"github.com/stretchr/testify/require"
)
func TestDelete(t *testing.T, cons Constructor, min, max uint64) {
func TestDelete(t *testing.T, cons Constructor, minSize, maxSize uint64) {
s := cons(t)
require.NoError(t, s.Open(mode.ComponentReadWrite))
require.NoError(t, s.Init())
defer func() { require.NoError(t, s.Close()) }()
objects := prepare(t, 4, s, min, max)
objects := prepare(t, 4, s, minSize, maxSize)
t.Run("delete non-existent", func(t *testing.T) {
var prm common.DeletePrm

View file

@ -10,13 +10,13 @@ import (
"github.com/stretchr/testify/require"
)
func TestExists(t *testing.T, cons Constructor, min, max uint64) {
func TestExists(t *testing.T, cons Constructor, minSize, maxSize uint64) {
s := cons(t)
require.NoError(t, s.Open(mode.ComponentReadWrite))
require.NoError(t, s.Init())
defer func() { require.NoError(t, s.Close()) }()
objects := prepare(t, 1, s, min, max)
objects := prepare(t, 1, s, minSize, maxSize)
t.Run("missing object", func(t *testing.T) {
prm := common.ExistsPrm{Address: oidtest.Address()}

View file

@ -11,13 +11,13 @@ import (
"github.com/stretchr/testify/require"
)
func TestGet(t *testing.T, cons Constructor, min, max uint64) {
func TestGet(t *testing.T, cons Constructor, minSize, maxSize uint64) {
s := cons(t)
require.NoError(t, s.Open(mode.ComponentReadWrite))
require.NoError(t, s.Init())
defer func() { require.NoError(t, s.Close()) }()
objects := prepare(t, 2, s, min, max)
objects := prepare(t, 2, s, minSize, maxSize)
t.Run("missing object", func(t *testing.T) {
gPrm := common.GetPrm{Address: oidtest.Address()}

View file

@ -13,13 +13,13 @@ import (
"github.com/stretchr/testify/require"
)
func TestGetRange(t *testing.T, cons Constructor, min, max uint64) {
func TestGetRange(t *testing.T, cons Constructor, minSize, maxSize uint64) {
s := cons(t)
require.NoError(t, s.Open(mode.ComponentReadWrite))
require.NoError(t, s.Init())
defer func() { require.NoError(t, s.Close()) }()
objects := prepare(t, 1, s, min, max)
objects := prepare(t, 1, s, minSize, maxSize)
t.Run("missing object", func(t *testing.T) {
gPrm := common.GetRangePrm{Address: oidtest.Address()}

View file

@ -10,13 +10,13 @@ import (
"github.com/stretchr/testify/require"
)
func TestIterate(t *testing.T, cons Constructor, min, max uint64) {
func TestIterate(t *testing.T, cons Constructor, minSize, maxSize uint64) {
s := cons(t)
require.NoError(t, s.Open(mode.ComponentReadWrite))
require.NoError(t, s.Init())
defer func() { require.NoError(t, s.Close()) }()
objects := prepare(t, 10, s, min, max)
objects := prepare(t, 10, s, minSize, maxSize)
// Delete random object to ensure it is not iterated over.
const delID = 2

View file

@ -1161,6 +1161,7 @@ func (t *boltForest) fillSortedChildren(b *bbolt.Bucket, nodeIDs MultiNode, h *f
lastFilename = nil
nodes = nil
length = actualLength + 1
count = 0
c.Seek(append(prefix, byte(length), byte(length>>8)))
c.Prev() // c.Next() will be performed by for loop
}

View file

@ -1,11 +1,13 @@
package pilorama
import (
"bytes"
"context"
"crypto/rand"
"fmt"
mrand "math/rand"
"path/filepath"
"slices"
"strconv"
"strings"
"sync"
@ -232,6 +234,65 @@ func BenchmarkForestSortedIteration(b *testing.B) {
}
}
// The issue which we call "BugWithSkip" is easiest to understand when filenames are
// monotonically increasing numbers. We want the list of sorted filenames to have different length interleaved.
// The bug happens when we switch between length during listing.
// Thus this test contains numbers from 1 to 2000 and batch size of size 10.
func TestForest_TreeSortedIterationBugWithSkip(t *testing.T) {
for i := range providers {
t.Run(providers[i].name, func(t *testing.T) {
testForestTreeSortedIterationBugWithSkip(t, providers[i].construct(t))
})
}
}
func testForestTreeSortedIterationBugWithSkip(t *testing.T, s ForestStorage) {
defer func() { require.NoError(t, s.Close()) }()
cid := cidtest.ID()
d := CIDDescriptor{cid, 0, 1}
treeID := "version"
treeAdd := func(t *testing.T, ts int, filename string) {
_, err := s.TreeMove(context.Background(), d, treeID, &Move{
Child: RootID + uint64(ts),
Parent: RootID,
Meta: Meta{
Time: Timestamp(ts),
Items: []KeyValue{
{Key: AttributeFilename, Value: []byte(filename)},
},
},
})
require.NoError(t, err)
}
const count = 2000
treeAdd(t, 1, "")
for i := 1; i < count; i++ {
treeAdd(t, i+1, strconv.Itoa(i+1))
}
var result []MultiNodeInfo
treeAppend := func(t *testing.T, last *string, count int) *string {
res, cursor, err := s.TreeSortedByFilename(context.Background(), d.CID, treeID, MultiNode{RootID}, last, count)
require.NoError(t, err)
result = append(result, res...)
return cursor
}
const batchSize = 10
last := treeAppend(t, nil, batchSize)
for i := 1; i < count/batchSize; i++ {
last = treeAppend(t, last, batchSize)
}
require.Len(t, result, count)
require.True(t, slices.IsSortedFunc(result, func(a, b MultiNodeInfo) int {
filenameA := findAttr(a.Meta, AttributeFilename)
filenameB := findAttr(b.Meta, AttributeFilename)
return bytes.Compare(filenameA, filenameB)
}))
}
func TestForest_TreeSortedIteration(t *testing.T) {
for i := range providers {
t.Run(providers[i].name, func(t *testing.T) {

View file

@ -2,6 +2,8 @@ package pilorama
import (
"container/heap"
"slices"
"strings"
)
type heapInfo struct {
@ -28,9 +30,10 @@ func (h *filenameHeap) Pop() any {
// fixedHeap maintains a fixed number of smallest elements started at some point.
type fixedHeap struct {
start *string
count int
h *filenameHeap
start *string
sorted bool
count int
h *filenameHeap
}
func newHeap(start *string, count int) *fixedHeap {
@ -44,20 +47,39 @@ func newHeap(start *string, count int) *fixedHeap {
}
}
const amortizationMultiplier = 5
func (h *fixedHeap) push(id MultiNode, filename string) bool {
if h.start != nil && filename <= *h.start {
return false
}
heap.Push(h.h, heapInfo{id: id, filename: filename})
if h.h.Len() > h.count {
heap.Remove(h.h, h.h.Len()-1)
*h.h = append(*h.h, heapInfo{id: id, filename: filename})
h.sorted = false
if h.h.Len() > h.count*amortizationMultiplier {
slices.SortFunc(*h.h, func(a, b heapInfo) int {
return strings.Compare(a.filename, b.filename)
})
*h.h = (*h.h)[:h.count]
}
return true
}
func (h *fixedHeap) pop() (heapInfo, bool) {
if h.h.Len() != 0 {
return heap.Pop(h.h).(heapInfo), true
if !h.sorted {
slices.SortFunc(*h.h, func(a, b heapInfo) int {
return strings.Compare(a.filename, b.filename)
})
if len(*h.h) > h.count {
*h.h = (*h.h)[:h.count]
}
h.sorted = true
}
if len(*h.h) != 0 {
info := (*h.h)[0]
*h.h = (*h.h)[1:]
return info, true
}
return heapInfo{}, false
}

View file

@ -641,8 +641,8 @@ func (c *Client) notaryTxValidationLimit() (uint32, error) {
return 0, fmt.Errorf("can't get current blockchain height: %w", err)
}
min := bc + c.notary.txValidTime
rounded := (min/c.notary.roundTime + 1) * c.notary.roundTime
minTime := bc + c.notary.txValidTime
rounded := (minTime/c.notary.roundTime + 1) * c.notary.roundTime
return rounded, nil
}

View file

@ -33,10 +33,15 @@ func (a *auditService) AddChain(ctx context.Context, req *apemanager.AddChainReq
return res, err
}
var respChainID []byte
if respBody := res.GetBody(); respBody != nil {
respChainID = respBody.GetChainID()
}
audit.LogRequest(a.log, ape_grpc.APEManagerService_AddChain_FullMethodName, req,
audit.TargetFromChainID(req.GetBody().GetTarget().GetTargetType().String(),
req.GetBody().GetTarget().GetName(),
res.GetBody().GetChainID()),
respChainID),
err == nil)
return res, err

View file

@ -2,9 +2,11 @@ package getsvc
import (
"context"
"errors"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
"go.uber.org/zap"
)
@ -120,6 +122,12 @@ func (exec *request) analyzeStatus(ctx context.Context, execCnr bool) {
exec.log.Debug(logs.OperationFinishedWithError,
zap.Error(exec.err),
)
var errAccessDenied *apistatus.ObjectAccessDenied
if execCnr && errors.As(exec.err, &errAccessDenied) {
// Local get can't return access denied error, so this error was returned by
// write to the output stream. So there is no need to try to find object on other nodes.
return
}
if execCnr {
exec.executeOnContainer(ctx)

View file

@ -31,6 +31,7 @@ func (r *request) processNode(ctx context.Context, info client.NodeInfo) bool {
var errECInfo *objectSDK.ECInfoError
var errRemoved *apistatus.ObjectAlreadyRemoved
var errOutOfRange *apistatus.ObjectOutOfRange
var errAccessDenied *apistatus.ObjectAccessDenied
switch {
default:
@ -38,7 +39,11 @@ func (r *request) processNode(ctx context.Context, info client.NodeInfo) bool {
if r.status != statusEC {
// for raw requests, continue to collect other parts
r.status = statusUndefined
r.err = new(apistatus.ObjectNotFound)
if errors.As(err, &errAccessDenied) {
r.err = err
} else {
r.err = new(apistatus.ObjectNotFound)
}
}
return false
case err == nil:

View file

@ -23,12 +23,14 @@ import (
)
type getRequestForwarder struct {
OnceResign sync.Once
OnceHeaderSending sync.Once
GlobalProgress int
Key *ecdsa.PrivateKey
Request *objectV2.GetRequest
Stream *streamObjectWriter
OnceResign sync.Once
GlobalProgress int
Key *ecdsa.PrivateKey
Request *objectV2.GetRequest
Stream *streamObjectWriter
headerSent bool
headerSentGuard sync.Mutex
}
func (f *getRequestForwarder) forwardRequestToNode(ctx context.Context, addr network.Address, c client.MultiAddressClient, pubkey []byte) (*objectSDK.Object, error) {
@ -83,13 +85,15 @@ func (f *getRequestForwarder) writeHeader(ctx context.Context, v *objectV2.GetOb
obj.SetSignature(v.GetSignature())
obj.SetHeader(v.GetHeader())
var err error
f.OnceHeaderSending.Do(func() {
err = f.Stream.WriteHeader(ctx, objectSDK.NewFromV2(obj))
})
if err != nil {
f.headerSentGuard.Lock()
defer f.headerSentGuard.Unlock()
if f.headerSent {
return nil
}
if err := f.Stream.WriteHeader(ctx, objectSDK.NewFromV2(obj)); err != nil {
return errCouldNotWriteObjHeader(err)
}
f.headerSent = true
return nil
}

View file

@ -24,6 +24,8 @@ type distributedTarget struct {
nodeTargetInitializer func(nodeDesc) preparedObjectTarget
relay func(context.Context, nodeDesc) error
resetSuccessAfterOnBroadcast bool
}
// parameters and state of container traversal.
@ -35,6 +37,8 @@ type traversal struct {
// container nodes which was processed during the primary object placement
mExclude map[string]*bool
resetSuccessAfterOnBroadcast bool
}
// updates traversal parameters after the primary placement finish and
@ -44,6 +48,10 @@ func (x *traversal) submitPrimaryPlacementFinish() bool {
// do not track success during container broadcast (best-effort)
x.opts = append(x.opts, placement.WithoutSuccessTracking())
if x.resetSuccessAfterOnBroadcast {
x.opts = append(x.opts, placement.ResetSuccessAfter())
}
// avoid 2nd broadcast
x.extraBroadcastEnabled = false
@ -118,5 +126,6 @@ func (t *distributedTarget) iteratePlacement(ctx context.Context) error {
iter := t.cfg.newNodeIterator(append(t.placementOpts, placement.ForObject(id)))
iter.extraBroadcastEnabled = needAdditionalBroadcast(t.obj, false /* Distributed target is for cluster-wide PUT */)
iter.resetSuccessAfterOnBroadcast = t.resetSuccessAfterOnBroadcast
return iter.forEachNode(ctx, t.sendObject)
}

View file

@ -197,14 +197,15 @@ func (e *ecWriter) writeRawObject(ctx context.Context, obj *objectSDK.Object) er
if err != nil {
return err
}
partsProcessed := make([]atomic.Bool, len(parts))
objID, _ := obj.ID()
t, err := placement.NewTraverser(append(e.placementOpts, placement.ForObject(objID))...)
if err != nil {
return err
}
eg, egCtx := errgroup.WithContext(ctx)
for {
eg, egCtx := errgroup.WithContext(ctx)
nodes := t.Next()
if len(nodes) == 0 {
break
@ -216,14 +217,20 @@ func (e *ecWriter) writeRawObject(ctx context.Context, obj *objectSDK.Object) er
}
for idx := range parts {
idx := idx
eg.Go(func() error {
return e.writePart(egCtx, parts[idx], idx, nodes, visited)
})
t.SubmitSuccess()
if !partsProcessed[idx].Load() {
eg.Go(func() error {
err := e.writePart(egCtx, parts[idx], idx, nodes, visited)
if err == nil {
partsProcessed[idx].Store(true)
t.SubmitSuccess()
}
return err
})
}
}
err = eg.Wait()
}
if err := eg.Wait(); err != nil {
if err != nil {
return errIncompletePut{
singleErr: err,
}

View file

@ -0,0 +1,191 @@
package putsvc
import (
"bytes"
"context"
"crypto/rand"
"crypto/sha256"
"errors"
"fmt"
"strconv"
"testing"
rawclient "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/rpc/client"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
netmapcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/checksum"
apiclient "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test"
usertest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user/test"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/version"
"git.frostfs.info/TrueCloudLab/tzhash/tz"
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
"github.com/panjf2000/ants/v2"
"github.com/stretchr/testify/require"
)
type testPlacementBuilder struct {
vectors [][]netmap.NodeInfo
}
func (p *testPlacementBuilder) BuildPlacement(_ cid.ID, _ *oid.ID, _ netmap.PlacementPolicy) (
[][]netmap.NodeInfo, error,
) {
arr := make([]netmap.NodeInfo, len(p.vectors[0]))
copy(arr, p.vectors[0])
return [][]netmap.NodeInfo{arr}, nil
}
type nmKeys struct{}
func (nmKeys) IsLocalKey(_ []byte) bool {
return false
}
type clientConstructor struct {
vectors [][]netmap.NodeInfo
}
func (c clientConstructor) Get(info client.NodeInfo) (client.MultiAddressClient, error) {
if bytes.Equal(info.PublicKey(), c.vectors[0][0].PublicKey()) ||
bytes.Equal(info.PublicKey(), c.vectors[0][1].PublicKey()) {
return multiAddressClient{err: errors.New("node unavailable")}, nil
}
return multiAddressClient{}, nil
}
type multiAddressClient struct {
client.MultiAddressClient
err error
}
func (c multiAddressClient) ObjectPutSingle(_ context.Context, _ apiclient.PrmObjectPutSingle) (*apiclient.ResObjectPutSingle, error) {
if c.err != nil {
return nil, c.err
}
return &apiclient.ResObjectPutSingle{}, nil
}
func (c multiAddressClient) ReportError(error) {
}
func (multiAddressClient) RawForAddress(context.Context, network.Address, func(cli *rawclient.Client) error) error {
return nil
}
func TestECWriter(t *testing.T) {
// Create container with policy EC 1.1
cnr := container.Container{}
p1 := netmap.PlacementPolicy{}
p1.SetContainerBackupFactor(1)
x1 := netmap.ReplicaDescriptor{}
x1.SetECDataCount(1)
x1.SetECParityCount(1)
p1.AddReplicas(x1)
cnr.SetPlacementPolicy(p1)
cnr.SetAttribute("cnr", "cnr1")
cid := cidtest.ID()
// Create 4 nodes, 2 nodes for chunks,
// 2 nodes for the case when the first two will fail.
ns, _ := testNodeMatrix(t, []int{4})
data := make([]byte, 100)
_, _ = rand.Read(data)
ver := version.Current()
var csum checksum.Checksum
csum.SetSHA256(sha256.Sum256(data))
var csumTZ checksum.Checksum
csumTZ.SetTillichZemor(tz.Sum(csum.Value()))
obj := objectSDK.New()
obj.SetID(oidtest.ID())
obj.SetOwnerID(usertest.ID())
obj.SetContainerID(cid)
obj.SetVersion(&ver)
obj.SetPayload(data)
obj.SetPayloadSize(uint64(len(data)))
obj.SetPayloadChecksum(csum)
obj.SetPayloadHomomorphicHash(csumTZ)
// Builder return nodes without sort by hrw
builder := &testPlacementBuilder{
vectors: ns,
}
ownerKey, err := keys.NewPrivateKey()
require.NoError(t, err)
pool, err := ants.NewPool(4, ants.WithNonblocking(true))
require.NoError(t, err)
log, err := logger.NewLogger(nil)
require.NoError(t, err)
var n nmKeys
ecw := ecWriter{
cfg: &cfg{
netmapKeys: n,
remotePool: pool,
log: log,
clientConstructor: clientConstructor{vectors: ns},
},
placementOpts: append(
[]placement.Option{placement.UseBuilder(builder), placement.ForContainer(cnr)},
placement.WithCopyNumbers(nil)), // copies number ignored for EC
container: cnr,
key: &ownerKey.PrivateKey,
relay: nil,
objMetaValid: true,
}
err = ecw.WriteObject(context.Background(), obj)
require.NoError(t, err)
}
func testNodeMatrix(t testing.TB, dim []int) ([][]netmap.NodeInfo, [][]string) {
mNodes := make([][]netmap.NodeInfo, len(dim))
mAddr := make([][]string, len(dim))
for i := range dim {
ns := make([]netmap.NodeInfo, dim[i])
as := make([]string, dim[i])
for j := range dim[i] {
a := fmt.Sprintf("/ip4/192.168.0.%s/tcp/%s",
strconv.Itoa(i),
strconv.Itoa(60000+j),
)
var ni netmap.NodeInfo
ni.SetNetworkEndpoints(a)
ni.SetPublicKey([]byte(a))
var na network.AddressGroup
err := na.FromIterator(netmapcore.Node(ni))
require.NoError(t, err)
as[j] = network.StringifyGroup(na)
ns[j] = ni
}
mNodes[i] = ns
mAddr[i] = as
}
return mNodes, mAddr
}

View file

@ -166,6 +166,7 @@ func (s *Service) saveToNodes(ctx context.Context, obj *objectSDK.Object, req *o
func (s *Service) saveToREPReplicas(ctx context.Context, placement putSinglePlacement, obj *objectSDK.Object, localOnly bool, req *objectAPI.PutSingleRequest, meta object.ContentMeta) error {
iter := s.cfg.newNodeIterator(placement.placementOptions)
iter.extraBroadcastEnabled = needAdditionalBroadcast(obj, localOnly)
iter.resetSuccessAfterOnBroadcast = placement.resetSuccessAfterOnBroadcast
signer := &putSingleRequestSigner{
req: req,
@ -209,9 +210,10 @@ func (s *Service) saveToECReplicas(ctx context.Context, placement putSinglePlace
}
type putSinglePlacement struct {
placementOptions []placement.Option
isEC bool
container containerSDK.Container
placementOptions []placement.Option
isEC bool
container containerSDK.Container
resetSuccessAfterOnBroadcast bool
}
func (s *Service) getPutSinglePlacementOptions(obj *objectSDK.Object, copiesNumber []uint32, localOnly bool) (putSinglePlacement, error) {
@ -232,6 +234,7 @@ func (s *Service) getPutSinglePlacementOptions(obj *objectSDK.Object, copiesNumb
}
if container.IsECContainer(cnrInfo.Value) && !object.IsECSupported(obj) && !localOnly {
result.placementOptions = append(result.placementOptions, placement.SuccessAfter(uint32(policy.ECParityCount(cnrInfo.Value.PlacementPolicy())+1)))
result.resetSuccessAfterOnBroadcast = true
}
result.placementOptions = append(result.placementOptions, placement.ForContainer(cnrInfo.Value))

View file

@ -233,16 +233,19 @@ func (p *Streamer) newDefaultObjectWriter(prm *PutInitPrm, forECPlacement bool)
}
}
var resetSuccessAfterOnBroadcast bool
traverseOpts := prm.traverseOpts
if forECPlacement && !prm.common.LocalOnly() {
// save non-regular and linking object to EC container.
// EC 2.1 -> REP 2, EC 2.2 -> REP 3 etc.
traverseOpts = append(traverseOpts, placement.SuccessAfter(uint32(policy.ECParityCount(prm.cnr.PlacementPolicy())+1)))
resetSuccessAfterOnBroadcast = true
}
return &distributedTarget{
cfg: p.cfg,
placementOpts: traverseOpts,
cfg: p.cfg,
placementOpts: traverseOpts,
resetSuccessAfterOnBroadcast: resetSuccessAfterOnBroadcast,
nodeTargetInitializer: func(node nodeDesc) preparedObjectTarget {
if node.local {
return localTarget{

View file

@ -303,6 +303,13 @@ func SuccessAfter(v uint32) Option {
}
}
// ResetSuccessAfter resets flat success number setting option.
func ResetSuccessAfter() Option {
return func(c *cfg) {
c.flatSuccess = nil
}
}
// WithoutSuccessTracking disables success tracking in traversal.
func WithoutSuccessTracking() Option {
return func(c *cfg) {

1
staticcheck.conf Normal file
View file

@ -0,0 +1 @@
checks = ["inherit", "-SA1019"]