forked from TrueCloudLab/frostfs-node
Compare commits
38 commits
bugfix/ref
...
master
Author | SHA1 | Date | |
---|---|---|---|
9b113c3156 | |||
4de5fca547 | |||
9c4c5a5262 | |||
6fcae9f75a | |||
1df64c5cab | |||
6a580db55e | |||
24054cf6f4 | |||
9ee3dd4e91 | |||
78bfd12229 | |||
57dc0a8e9e | |||
b309b34bfc | |||
c8acdf40bb | |||
6410542d19 | |||
c0a341a7f6 | |||
e1a984e9d8 | |||
abfd9657f9 | |||
a788d44773 | |||
603015d029 | |||
30e14d50ef | |||
951a7ee1c7 | |||
0bcbeb26b2 | |||
c98357606b | |||
80de5d70bf | |||
57efa0bc8e | |||
26e0c82fb8 | |||
4538ccb12a | |||
84e1599997 | |||
5a270e2e61 | |||
436d65d784 | |||
c3c034ecca | |||
05fd999162 | |||
eff95bd632 | |||
fb928616cc | |||
4d5ae59a52 | |||
a9f27e074b | |||
6c51f48aab | |||
a2485637bb | |||
09faca034c |
205 changed files with 1407 additions and 1267 deletions
28
.forgejo/workflows/oci-image.yml
Normal file
28
.forgejo/workflows/oci-image.yml
Normal file
|
@ -0,0 +1,28 @@
|
||||||
|
name: OCI image
|
||||||
|
|
||||||
|
on:
|
||||||
|
push:
|
||||||
|
workflow_dispatch:
|
||||||
|
|
||||||
|
jobs:
|
||||||
|
image:
|
||||||
|
name: Build container images
|
||||||
|
runs-on: docker
|
||||||
|
container: git.frostfs.info/truecloudlab/env:oci-image-builder-bookworm
|
||||||
|
steps:
|
||||||
|
- name: Clone git repo
|
||||||
|
uses: actions/checkout@v3
|
||||||
|
|
||||||
|
- name: Build OCI image
|
||||||
|
run: make images
|
||||||
|
|
||||||
|
- name: Push image to OCI registry
|
||||||
|
run: |
|
||||||
|
echo "$REGISTRY_PASSWORD" \
|
||||||
|
| docker login --username truecloudlab --password-stdin git.frostfs.info
|
||||||
|
make push-images
|
||||||
|
if: >-
|
||||||
|
startsWith(github.ref, 'refs/tags/v') &&
|
||||||
|
(github.event_name == 'workflow_dispatch' || github.event_name == 'push')
|
||||||
|
env:
|
||||||
|
REGISTRY_PASSWORD: ${{secrets.FORGEJO_OCI_REGISTRY_PUSH_TOKEN}}
|
|
@ -18,7 +18,7 @@ jobs:
|
||||||
- name: Setup Go
|
- name: Setup Go
|
||||||
uses: actions/setup-go@v3
|
uses: actions/setup-go@v3
|
||||||
with:
|
with:
|
||||||
go-version: '1.23'
|
go-version: '1.23.5'
|
||||||
|
|
||||||
- name: Install govulncheck
|
- name: Install govulncheck
|
||||||
run: go install golang.org/x/vuln/cmd/govulncheck@latest
|
run: go install golang.org/x/vuln/cmd/govulncheck@latest
|
||||||
|
|
|
@ -89,5 +89,7 @@ linters:
|
||||||
- protogetter
|
- protogetter
|
||||||
- intrange
|
- intrange
|
||||||
- tenv
|
- tenv
|
||||||
|
- unconvert
|
||||||
|
- unparam
|
||||||
disable-all: true
|
disable-all: true
|
||||||
fast: false
|
fast: false
|
||||||
|
|
11
Makefile
11
Makefile
|
@ -8,7 +8,7 @@ HUB_IMAGE ?= git.frostfs.info/truecloudlab/frostfs
|
||||||
HUB_TAG ?= "$(shell echo ${VERSION} | sed 's/^v//')"
|
HUB_TAG ?= "$(shell echo ${VERSION} | sed 's/^v//')"
|
||||||
|
|
||||||
GO_VERSION ?= 1.22
|
GO_VERSION ?= 1.22
|
||||||
LINT_VERSION ?= 1.62.0
|
LINT_VERSION ?= 1.62.2
|
||||||
TRUECLOUDLAB_LINT_VERSION ?= 0.0.8
|
TRUECLOUDLAB_LINT_VERSION ?= 0.0.8
|
||||||
PROTOC_VERSION ?= 25.0
|
PROTOC_VERSION ?= 25.0
|
||||||
PROTOGEN_FROSTFS_VERSION ?= $(shell go list -f '{{.Version}}' -m git.frostfs.info/TrueCloudLab/frostfs-sdk-go)
|
PROTOGEN_FROSTFS_VERSION ?= $(shell go list -f '{{.Version}}' -m git.frostfs.info/TrueCloudLab/frostfs-sdk-go)
|
||||||
|
@ -139,6 +139,15 @@ images: image-storage image-ir image-cli image-adm
|
||||||
# Build dirty local Docker images
|
# Build dirty local Docker images
|
||||||
dirty-images: image-dirty-storage image-dirty-ir image-dirty-cli image-dirty-adm
|
dirty-images: image-dirty-storage image-dirty-ir image-dirty-cli image-dirty-adm
|
||||||
|
|
||||||
|
# Push FrostFS components' docker image to the registry
|
||||||
|
push-image-%:
|
||||||
|
@echo "⇒ Publish FrostFS $* docker image "
|
||||||
|
@docker push $(HUB_IMAGE)-$*:$(HUB_TAG)
|
||||||
|
|
||||||
|
# Push all Docker images to the registry
|
||||||
|
.PHONY: push-images
|
||||||
|
push-images: push-image-storage push-image-ir push-image-cli push-image-adm
|
||||||
|
|
||||||
# Run `make %` in Golang container
|
# Run `make %` in Golang container
|
||||||
docker/%:
|
docker/%:
|
||||||
docker run --rm -t \
|
docker run --rm -t \
|
||||||
|
|
|
@ -28,6 +28,7 @@ const (
|
||||||
var (
|
var (
|
||||||
errNoPathsFound = errors.New("no metabase paths found")
|
errNoPathsFound = errors.New("no metabase paths found")
|
||||||
errNoMorphEndpointsFound = errors.New("no morph endpoints found")
|
errNoMorphEndpointsFound = errors.New("no morph endpoints found")
|
||||||
|
errUpgradeFailed = errors.New("upgrade failed")
|
||||||
)
|
)
|
||||||
|
|
||||||
var UpgradeCmd = &cobra.Command{
|
var UpgradeCmd = &cobra.Command{
|
||||||
|
@ -91,15 +92,20 @@ func upgrade(cmd *cobra.Command, _ []string) error {
|
||||||
if err := eg.Wait(); err != nil {
|
if err := eg.Wait(); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
allSuccess := true
|
||||||
for mb, ok := range result {
|
for mb, ok := range result {
|
||||||
if ok {
|
if ok {
|
||||||
cmd.Println(mb, ": success")
|
cmd.Println(mb, ": success")
|
||||||
} else {
|
} else {
|
||||||
cmd.Println(mb, ": failed")
|
cmd.Println(mb, ": failed")
|
||||||
|
allSuccess = false
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if allSuccess {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
return errUpgradeFailed
|
||||||
|
}
|
||||||
|
|
||||||
func getMetabasePaths(appCfg *config.Config) ([]string, error) {
|
func getMetabasePaths(appCfg *config.Config) ([]string, error) {
|
||||||
var paths []string
|
var paths []string
|
||||||
|
|
|
@ -253,7 +253,7 @@ func frostfsidListNamespaces(cmd *cobra.Command, _ []string) {
|
||||||
reader := frostfsidrpclient.NewReader(inv, hash)
|
reader := frostfsidrpclient.NewReader(inv, hash)
|
||||||
sessionID, it, err := reader.ListNamespaces()
|
sessionID, it, err := reader.ListNamespaces()
|
||||||
commonCmd.ExitOnErr(cmd, "can't get namespace: %w", err)
|
commonCmd.ExitOnErr(cmd, "can't get namespace: %w", err)
|
||||||
items, err := readIterator(inv, &it, iteratorBatchSize, sessionID)
|
items, err := readIterator(inv, &it, sessionID)
|
||||||
commonCmd.ExitOnErr(cmd, "can't read iterator: %w", err)
|
commonCmd.ExitOnErr(cmd, "can't read iterator: %w", err)
|
||||||
|
|
||||||
namespaces, err := frostfsidclient.ParseNamespaces(items)
|
namespaces, err := frostfsidclient.ParseNamespaces(items)
|
||||||
|
@ -305,7 +305,7 @@ func frostfsidListSubjects(cmd *cobra.Command, _ []string) {
|
||||||
sessionID, it, err := reader.ListNamespaceSubjects(ns)
|
sessionID, it, err := reader.ListNamespaceSubjects(ns)
|
||||||
commonCmd.ExitOnErr(cmd, "can't get namespace: %w", err)
|
commonCmd.ExitOnErr(cmd, "can't get namespace: %w", err)
|
||||||
|
|
||||||
subAddresses, err := frostfsidclient.UnwrapArrayOfUint160(readIterator(inv, &it, iteratorBatchSize, sessionID))
|
subAddresses, err := frostfsidclient.UnwrapArrayOfUint160(readIterator(inv, &it, sessionID))
|
||||||
commonCmd.ExitOnErr(cmd, "can't unwrap: %w", err)
|
commonCmd.ExitOnErr(cmd, "can't unwrap: %w", err)
|
||||||
|
|
||||||
sort.Slice(subAddresses, func(i, j int) bool { return subAddresses[i].Less(subAddresses[j]) })
|
sort.Slice(subAddresses, func(i, j int) bool { return subAddresses[i].Less(subAddresses[j]) })
|
||||||
|
@ -319,7 +319,7 @@ func frostfsidListSubjects(cmd *cobra.Command, _ []string) {
|
||||||
sessionID, it, err := reader.ListSubjects()
|
sessionID, it, err := reader.ListSubjects()
|
||||||
commonCmd.ExitOnErr(cmd, "can't get subject: %w", err)
|
commonCmd.ExitOnErr(cmd, "can't get subject: %w", err)
|
||||||
|
|
||||||
items, err := readIterator(inv, &it, iteratorBatchSize, sessionID)
|
items, err := readIterator(inv, &it, sessionID)
|
||||||
commonCmd.ExitOnErr(cmd, "can't read iterator: %w", err)
|
commonCmd.ExitOnErr(cmd, "can't read iterator: %w", err)
|
||||||
|
|
||||||
subj, err := frostfsidclient.ParseSubject(items)
|
subj, err := frostfsidclient.ParseSubject(items)
|
||||||
|
@ -365,7 +365,7 @@ func frostfsidListGroups(cmd *cobra.Command, _ []string) {
|
||||||
sessionID, it, err := reader.ListGroups(ns)
|
sessionID, it, err := reader.ListGroups(ns)
|
||||||
commonCmd.ExitOnErr(cmd, "can't get namespace: %w", err)
|
commonCmd.ExitOnErr(cmd, "can't get namespace: %w", err)
|
||||||
|
|
||||||
items, err := readIterator(inv, &it, iteratorBatchSize, sessionID)
|
items, err := readIterator(inv, &it, sessionID)
|
||||||
commonCmd.ExitOnErr(cmd, "can't list groups: %w", err)
|
commonCmd.ExitOnErr(cmd, "can't list groups: %w", err)
|
||||||
groups, err := frostfsidclient.ParseGroups(items)
|
groups, err := frostfsidclient.ParseGroups(items)
|
||||||
commonCmd.ExitOnErr(cmd, "can't parse groups: %w", err)
|
commonCmd.ExitOnErr(cmd, "can't parse groups: %w", err)
|
||||||
|
@ -415,7 +415,7 @@ func frostfsidListGroupSubjects(cmd *cobra.Command, _ []string) {
|
||||||
sessionID, it, err := reader.ListGroupSubjects(ns, big.NewInt(groupID))
|
sessionID, it, err := reader.ListGroupSubjects(ns, big.NewInt(groupID))
|
||||||
commonCmd.ExitOnErr(cmd, "can't list groups: %w", err)
|
commonCmd.ExitOnErr(cmd, "can't list groups: %w", err)
|
||||||
|
|
||||||
items, err := readIterator(inv, &it, iteratorBatchSize, sessionID)
|
items, err := readIterator(inv, &it, sessionID)
|
||||||
commonCmd.ExitOnErr(cmd, "can't read iterator: %w", err)
|
commonCmd.ExitOnErr(cmd, "can't read iterator: %w", err)
|
||||||
|
|
||||||
subjects, err := frostfsidclient.UnwrapArrayOfUint160(items, err)
|
subjects, err := frostfsidclient.UnwrapArrayOfUint160(items, err)
|
||||||
|
@ -492,17 +492,17 @@ func (f *frostfsidClient) sendWaitRes() (*state.AppExecResult, error) {
|
||||||
return f.roCli.Wait(f.wCtx.SentTxs[0].Hash, f.wCtx.SentTxs[0].Vub, nil)
|
return f.roCli.Wait(f.wCtx.SentTxs[0].Hash, f.wCtx.SentTxs[0].Vub, nil)
|
||||||
}
|
}
|
||||||
|
|
||||||
func readIterator(inv *invoker.Invoker, iter *result.Iterator, batchSize int, sessionID uuid.UUID) ([]stackitem.Item, error) {
|
func readIterator(inv *invoker.Invoker, iter *result.Iterator, sessionID uuid.UUID) ([]stackitem.Item, error) {
|
||||||
var shouldStop bool
|
var shouldStop bool
|
||||||
res := make([]stackitem.Item, 0)
|
res := make([]stackitem.Item, 0)
|
||||||
for !shouldStop {
|
for !shouldStop {
|
||||||
items, err := inv.TraverseIterator(sessionID, iter, batchSize)
|
items, err := inv.TraverseIterator(sessionID, iter, iteratorBatchSize)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
res = append(res, items...)
|
res = append(res, items...)
|
||||||
shouldStop = len(items) < batchSize
|
shouldStop = len(items) < iteratorBatchSize
|
||||||
}
|
}
|
||||||
|
|
||||||
return res, nil
|
return res, nil
|
||||||
|
|
|
@ -11,6 +11,7 @@ import (
|
||||||
"net/url"
|
"net/url"
|
||||||
"os"
|
"os"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
|
"slices"
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
"text/template"
|
"text/template"
|
||||||
|
@ -410,8 +411,7 @@ func initClient(rpc []string) *rpcclient.Client {
|
||||||
var c *rpcclient.Client
|
var c *rpcclient.Client
|
||||||
var err error
|
var err error
|
||||||
|
|
||||||
shuffled := make([]string, len(rpc))
|
shuffled := slices.Clone(rpc)
|
||||||
copy(shuffled, rpc)
|
|
||||||
rand.Shuffle(len(shuffled), func(i, j int) { shuffled[i], shuffled[j] = shuffled[j], shuffled[i] })
|
rand.Shuffle(len(shuffled), func(i, j int) { shuffled[i], shuffled[j] = shuffled[j], shuffled[i] })
|
||||||
|
|
||||||
for _, endpoint := range shuffled {
|
for _, endpoint := range shuffled {
|
||||||
|
|
|
@ -52,7 +52,7 @@ func genereateAPEOverride(cmd *cobra.Command, _ []string) {
|
||||||
|
|
||||||
outputPath, _ := cmd.Flags().GetString(outputFlag)
|
outputPath, _ := cmd.Flags().GetString(outputFlag)
|
||||||
if outputPath != "" {
|
if outputPath != "" {
|
||||||
err := os.WriteFile(outputPath, []byte(overrideMarshalled), 0o644)
|
err := os.WriteFile(outputPath, overrideMarshalled, 0o644)
|
||||||
commonCmd.ExitOnErr(cmd, "dump error: %w", err)
|
commonCmd.ExitOnErr(cmd, "dump error: %w", err)
|
||||||
} else {
|
} else {
|
||||||
fmt.Print("\n")
|
fmt.Print("\n")
|
||||||
|
|
|
@ -23,11 +23,11 @@ type policyPlaygroundREPL struct {
|
||||||
nodes map[string]netmap.NodeInfo
|
nodes map[string]netmap.NodeInfo
|
||||||
}
|
}
|
||||||
|
|
||||||
func newPolicyPlaygroundREPL(cmd *cobra.Command) (*policyPlaygroundREPL, error) {
|
func newPolicyPlaygroundREPL(cmd *cobra.Command) *policyPlaygroundREPL {
|
||||||
return &policyPlaygroundREPL{
|
return &policyPlaygroundREPL{
|
||||||
cmd: cmd,
|
cmd: cmd,
|
||||||
nodes: map[string]netmap.NodeInfo{},
|
nodes: map[string]netmap.NodeInfo{},
|
||||||
}, nil
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (repl *policyPlaygroundREPL) handleLs(args []string) error {
|
func (repl *policyPlaygroundREPL) handleLs(args []string) error {
|
||||||
|
@ -246,8 +246,7 @@ var policyPlaygroundCmd = &cobra.Command{
|
||||||
Long: `A REPL for testing placement policies.
|
Long: `A REPL for testing placement policies.
|
||||||
If a wallet and endpoint is provided, the initial netmap data will be loaded from the snapshot of the node. Otherwise, an empty playground is created.`,
|
If a wallet and endpoint is provided, the initial netmap data will be loaded from the snapshot of the node. Otherwise, an empty playground is created.`,
|
||||||
Run: func(cmd *cobra.Command, _ []string) {
|
Run: func(cmd *cobra.Command, _ []string) {
|
||||||
repl, err := newPolicyPlaygroundREPL(cmd)
|
repl := newPolicyPlaygroundREPL(cmd)
|
||||||
commonCmd.ExitOnErr(cmd, "could not create policy playground: %w", err)
|
|
||||||
commonCmd.ExitOnErr(cmd, "policy playground failed: %w", repl.run())
|
commonCmd.ExitOnErr(cmd, "policy playground failed: %w", repl.run())
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
|
@ -127,7 +127,7 @@ func awaitSetNetmapStatus(cmd *cobra.Command, pk *ecdsa.PrivateKey, cli *client.
|
||||||
var resp *control.GetNetmapStatusResponse
|
var resp *control.GetNetmapStatusResponse
|
||||||
var err error
|
var err error
|
||||||
err = cli.ExecRaw(func(client *rawclient.Client) error {
|
err = cli.ExecRaw(func(client *rawclient.Client) error {
|
||||||
resp, err = control.GetNetmapStatus(client, req)
|
resp, err = control.GetNetmapStatus(cmd.Context(), client, req)
|
||||||
return err
|
return err
|
||||||
})
|
})
|
||||||
commonCmd.ExitOnErr(cmd, "failed to get current netmap status: %w", err)
|
commonCmd.ExitOnErr(cmd, "failed to get current netmap status: %w", err)
|
||||||
|
|
|
@ -41,7 +41,7 @@ func initObjectHashCmd() {
|
||||||
flags.String(commonflags.OIDFlag, "", commonflags.OIDFlagUsage)
|
flags.String(commonflags.OIDFlag, "", commonflags.OIDFlagUsage)
|
||||||
_ = objectHashCmd.MarkFlagRequired(commonflags.OIDFlag)
|
_ = objectHashCmd.MarkFlagRequired(commonflags.OIDFlag)
|
||||||
|
|
||||||
flags.String("range", "", "Range to take hash from in the form offset1:length1,...")
|
flags.StringSlice("range", nil, "Range to take hash from in the form offset1:length1,...")
|
||||||
_ = objectHashCmd.MarkFlagRequired("range")
|
_ = objectHashCmd.MarkFlagRequired("range")
|
||||||
|
|
||||||
flags.String("type", hashSha256, "Hash type. Either 'sha256' or 'tz'")
|
flags.String("type", hashSha256, "Hash type. Either 'sha256' or 'tz'")
|
||||||
|
|
|
@ -320,7 +320,7 @@ func getReplicaRequiredPlacement(cmd *cobra.Command, objects []phyObject, placem
|
||||||
}
|
}
|
||||||
placementBuilder := placement.NewNetworkMapBuilder(netmap)
|
placementBuilder := placement.NewNetworkMapBuilder(netmap)
|
||||||
for _, object := range objects {
|
for _, object := range objects {
|
||||||
placement, err := placementBuilder.BuildPlacement(object.containerID, &object.objectID, placementPolicy)
|
placement, err := placementBuilder.BuildPlacement(cmd.Context(), object.containerID, &object.objectID, placementPolicy)
|
||||||
commonCmd.ExitOnErr(cmd, "failed to get required placement for object: %w", err)
|
commonCmd.ExitOnErr(cmd, "failed to get required placement for object: %w", err)
|
||||||
for repIdx, rep := range placement {
|
for repIdx, rep := range placement {
|
||||||
numOfReplicas := placementPolicy.ReplicaDescriptor(repIdx).NumberOfObjects()
|
numOfReplicas := placementPolicy.ReplicaDescriptor(repIdx).NumberOfObjects()
|
||||||
|
@ -358,7 +358,7 @@ func getECRequiredPlacementInternal(cmd *cobra.Command, object phyObject, placem
|
||||||
placementObjectID = object.ecHeader.parent
|
placementObjectID = object.ecHeader.parent
|
||||||
}
|
}
|
||||||
placementBuilder := placement.NewNetworkMapBuilder(netmap)
|
placementBuilder := placement.NewNetworkMapBuilder(netmap)
|
||||||
placement, err := placementBuilder.BuildPlacement(object.containerID, &placementObjectID, placementPolicy)
|
placement, err := placementBuilder.BuildPlacement(cmd.Context(), object.containerID, &placementObjectID, placementPolicy)
|
||||||
commonCmd.ExitOnErr(cmd, "failed to get required placement: %w", err)
|
commonCmd.ExitOnErr(cmd, "failed to get required placement: %w", err)
|
||||||
|
|
||||||
for _, vector := range placement {
|
for _, vector := range placement {
|
||||||
|
|
|
@ -46,7 +46,7 @@ func initObjectPatchCmd() {
|
||||||
flags.String(commonflags.OIDFlag, "", commonflags.OIDFlagUsage)
|
flags.String(commonflags.OIDFlag, "", commonflags.OIDFlagUsage)
|
||||||
_ = objectRangeCmd.MarkFlagRequired(commonflags.OIDFlag)
|
_ = objectRangeCmd.MarkFlagRequired(commonflags.OIDFlag)
|
||||||
|
|
||||||
flags.String(newAttrsFlagName, "", "New object attributes in form of Key1=Value1,Key2=Value2")
|
flags.StringSlice(newAttrsFlagName, nil, "New object attributes in form of Key1=Value1,Key2=Value2")
|
||||||
flags.Bool(replaceAttrsFlagName, false, "Replace object attributes by new ones.")
|
flags.Bool(replaceAttrsFlagName, false, "Replace object attributes by new ones.")
|
||||||
flags.StringSlice(rangeFlagName, []string{}, "Range to which patch payload is applied. Format: offset:length")
|
flags.StringSlice(rangeFlagName, []string{}, "Range to which patch payload is applied. Format: offset:length")
|
||||||
flags.StringSlice(payloadFlagName, []string{}, "Path to file with patch payload.")
|
flags.StringSlice(payloadFlagName, []string{}, "Path to file with patch payload.")
|
||||||
|
@ -99,11 +99,9 @@ func patch(cmd *cobra.Command, _ []string) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func parseNewObjectAttrs(cmd *cobra.Command) ([]objectSDK.Attribute, error) {
|
func parseNewObjectAttrs(cmd *cobra.Command) ([]objectSDK.Attribute, error) {
|
||||||
var rawAttrs []string
|
rawAttrs, err := cmd.Flags().GetStringSlice(newAttrsFlagName)
|
||||||
|
if err != nil {
|
||||||
raw := cmd.Flag(newAttrsFlagName).Value.String()
|
return nil, err
|
||||||
if len(raw) != 0 {
|
|
||||||
rawAttrs = strings.Split(raw, ",")
|
|
||||||
}
|
}
|
||||||
|
|
||||||
attrs := make([]objectSDK.Attribute, len(rawAttrs), len(rawAttrs)+2) // name + timestamp attributes
|
attrs := make([]objectSDK.Attribute, len(rawAttrs), len(rawAttrs)+2) // name + timestamp attributes
|
||||||
|
|
|
@ -50,7 +50,7 @@ func initObjectPutCmd() {
|
||||||
|
|
||||||
flags.String(commonflags.CIDFlag, "", commonflags.CIDFlagUsage)
|
flags.String(commonflags.CIDFlag, "", commonflags.CIDFlagUsage)
|
||||||
|
|
||||||
flags.String("attributes", "", "User attributes in form of Key1=Value1,Key2=Value2")
|
flags.StringSlice("attributes", nil, "User attributes in form of Key1=Value1,Key2=Value2")
|
||||||
flags.Bool("disable-filename", false, "Do not set well-known filename attribute")
|
flags.Bool("disable-filename", false, "Do not set well-known filename attribute")
|
||||||
flags.Bool("disable-timestamp", false, "Do not set well-known timestamp attribute")
|
flags.Bool("disable-timestamp", false, "Do not set well-known timestamp attribute")
|
||||||
flags.Uint64VarP(&putExpiredOn, commonflags.ExpireAt, "e", 0, "The last active epoch in the life of the object")
|
flags.Uint64VarP(&putExpiredOn, commonflags.ExpireAt, "e", 0, "The last active epoch in the life of the object")
|
||||||
|
@ -214,11 +214,9 @@ func getAllObjectAttributes(cmd *cobra.Command) []objectSDK.Attribute {
|
||||||
}
|
}
|
||||||
|
|
||||||
func parseObjectAttrs(cmd *cobra.Command) ([]objectSDK.Attribute, error) {
|
func parseObjectAttrs(cmd *cobra.Command) ([]objectSDK.Attribute, error) {
|
||||||
var rawAttrs []string
|
rawAttrs, err := cmd.Flags().GetStringSlice("attributes")
|
||||||
|
if err != nil {
|
||||||
raw := cmd.Flag("attributes").Value.String()
|
return nil, err
|
||||||
if len(raw) != 0 {
|
|
||||||
rawAttrs = strings.Split(raw, ",")
|
|
||||||
}
|
}
|
||||||
|
|
||||||
attrs := make([]objectSDK.Attribute, len(rawAttrs), len(rawAttrs)+2) // name + timestamp attributes
|
attrs := make([]objectSDK.Attribute, len(rawAttrs), len(rawAttrs)+2) // name + timestamp attributes
|
||||||
|
|
|
@ -38,7 +38,7 @@ func initObjectRangeCmd() {
|
||||||
flags.String(commonflags.OIDFlag, "", commonflags.OIDFlagUsage)
|
flags.String(commonflags.OIDFlag, "", commonflags.OIDFlagUsage)
|
||||||
_ = objectRangeCmd.MarkFlagRequired(commonflags.OIDFlag)
|
_ = objectRangeCmd.MarkFlagRequired(commonflags.OIDFlag)
|
||||||
|
|
||||||
flags.String("range", "", "Range to take data from in the form offset:length")
|
flags.StringSlice("range", nil, "Range to take data from in the form offset:length")
|
||||||
flags.String(fileFlag, "", "File to write object payload to. Default: stdout.")
|
flags.String(fileFlag, "", "File to write object payload to. Default: stdout.")
|
||||||
flags.Bool(rawFlag, false, rawFlagDesc)
|
flags.Bool(rawFlag, false, rawFlagDesc)
|
||||||
}
|
}
|
||||||
|
@ -195,11 +195,10 @@ func marshalECInfo(cmd *cobra.Command, info *objectSDK.ECInfo) ([]byte, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func getRangeList(cmd *cobra.Command) ([]objectSDK.Range, error) {
|
func getRangeList(cmd *cobra.Command) ([]objectSDK.Range, error) {
|
||||||
v := cmd.Flag("range").Value.String()
|
vs, err := cmd.Flags().GetStringSlice("range")
|
||||||
if len(v) == 0 {
|
if len(vs) == 0 || err != nil {
|
||||||
return nil, nil
|
return nil, err
|
||||||
}
|
}
|
||||||
vs := strings.Split(v, ",")
|
|
||||||
rs := make([]objectSDK.Range, len(vs))
|
rs := make([]objectSDK.Range, len(vs))
|
||||||
for i := range vs {
|
for i := range vs {
|
||||||
before, after, found := strings.Cut(vs[i], rangeSep)
|
before, after, found := strings.Cut(vs[i], rangeSep)
|
||||||
|
|
|
@ -124,10 +124,7 @@ func (v *BucketsView) loadNodeChildren(
|
||||||
path := parentBucket.Path
|
path := parentBucket.Path
|
||||||
parser := parentBucket.NextParser
|
parser := parentBucket.NextParser
|
||||||
|
|
||||||
buffer, err := LoadBuckets(ctx, v.ui.db, path, v.ui.loadBufferSize)
|
buffer := LoadBuckets(ctx, v.ui.db, path, v.ui.loadBufferSize)
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
for item := range buffer {
|
for item := range buffer {
|
||||||
if item.err != nil {
|
if item.err != nil {
|
||||||
|
@ -135,6 +132,7 @@ func (v *BucketsView) loadNodeChildren(
|
||||||
}
|
}
|
||||||
bucket := item.val
|
bucket := item.val
|
||||||
|
|
||||||
|
var err error
|
||||||
bucket.Entry, bucket.NextParser, err = parser(bucket.Name, nil)
|
bucket.Entry, bucket.NextParser, err = parser(bucket.Name, nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
@ -180,10 +178,7 @@ func (v *BucketsView) bucketSatisfiesFilter(
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
||||||
// Check the current bucket's nested buckets if exist
|
// Check the current bucket's nested buckets if exist
|
||||||
bucketsBuffer, err := LoadBuckets(ctx, v.ui.db, bucket.Path, v.ui.loadBufferSize)
|
bucketsBuffer := LoadBuckets(ctx, v.ui.db, bucket.Path, v.ui.loadBufferSize)
|
||||||
if err != nil {
|
|
||||||
return false, err
|
|
||||||
}
|
|
||||||
|
|
||||||
for item := range bucketsBuffer {
|
for item := range bucketsBuffer {
|
||||||
if item.err != nil {
|
if item.err != nil {
|
||||||
|
@ -191,6 +186,7 @@ func (v *BucketsView) bucketSatisfiesFilter(
|
||||||
}
|
}
|
||||||
b := item.val
|
b := item.val
|
||||||
|
|
||||||
|
var err error
|
||||||
b.Entry, b.NextParser, err = bucket.NextParser(b.Name, nil)
|
b.Entry, b.NextParser, err = bucket.NextParser(b.Name, nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return false, err
|
return false, err
|
||||||
|
@ -206,10 +202,7 @@ func (v *BucketsView) bucketSatisfiesFilter(
|
||||||
}
|
}
|
||||||
|
|
||||||
// Check the current bucket's nested records if exist
|
// Check the current bucket's nested records if exist
|
||||||
recordsBuffer, err := LoadRecords(ctx, v.ui.db, bucket.Path, v.ui.loadBufferSize)
|
recordsBuffer := LoadRecords(ctx, v.ui.db, bucket.Path, v.ui.loadBufferSize)
|
||||||
if err != nil {
|
|
||||||
return false, err
|
|
||||||
}
|
|
||||||
|
|
||||||
for item := range recordsBuffer {
|
for item := range recordsBuffer {
|
||||||
if item.err != nil {
|
if item.err != nil {
|
||||||
|
@ -217,6 +210,7 @@ func (v *BucketsView) bucketSatisfiesFilter(
|
||||||
}
|
}
|
||||||
r := item.val
|
r := item.val
|
||||||
|
|
||||||
|
var err error
|
||||||
r.Entry, _, err = bucket.NextParser(r.Key, r.Value)
|
r.Entry, _, err = bucket.NextParser(r.Key, r.Value)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return false, err
|
return false, err
|
||||||
|
|
|
@ -35,7 +35,7 @@ func resolvePath(tx *bbolt.Tx, path [][]byte) (*bbolt.Bucket, error) {
|
||||||
func load[T any](
|
func load[T any](
|
||||||
ctx context.Context, db *bbolt.DB, path [][]byte, bufferSize int,
|
ctx context.Context, db *bbolt.DB, path [][]byte, bufferSize int,
|
||||||
filter func(key, value []byte) bool, transform func(key, value []byte) T,
|
filter func(key, value []byte) bool, transform func(key, value []byte) T,
|
||||||
) (<-chan Item[T], error) {
|
) <-chan Item[T] {
|
||||||
buffer := make(chan Item[T], bufferSize)
|
buffer := make(chan Item[T], bufferSize)
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
|
@ -77,13 +77,13 @@ func load[T any](
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
return buffer, nil
|
return buffer
|
||||||
}
|
}
|
||||||
|
|
||||||
func LoadBuckets(
|
func LoadBuckets(
|
||||||
ctx context.Context, db *bbolt.DB, path [][]byte, bufferSize int,
|
ctx context.Context, db *bbolt.DB, path [][]byte, bufferSize int,
|
||||||
) (<-chan Item[*Bucket], error) {
|
) <-chan Item[*Bucket] {
|
||||||
buffer, err := load(
|
buffer := load(
|
||||||
ctx, db, path, bufferSize,
|
ctx, db, path, bufferSize,
|
||||||
func(_, value []byte) bool {
|
func(_, value []byte) bool {
|
||||||
return value == nil
|
return value == nil
|
||||||
|
@ -98,17 +98,14 @@ func LoadBuckets(
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("can't start iterating bucket: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
return buffer, nil
|
return buffer
|
||||||
}
|
}
|
||||||
|
|
||||||
func LoadRecords(
|
func LoadRecords(
|
||||||
ctx context.Context, db *bbolt.DB, path [][]byte, bufferSize int,
|
ctx context.Context, db *bbolt.DB, path [][]byte, bufferSize int,
|
||||||
) (<-chan Item[*Record], error) {
|
) <-chan Item[*Record] {
|
||||||
buffer, err := load(
|
buffer := load(
|
||||||
ctx, db, path, bufferSize,
|
ctx, db, path, bufferSize,
|
||||||
func(_, value []byte) bool {
|
func(_, value []byte) bool {
|
||||||
return value != nil
|
return value != nil
|
||||||
|
@ -124,11 +121,8 @@ func LoadRecords(
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("can't start iterating bucket: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
return buffer, nil
|
return buffer
|
||||||
}
|
}
|
||||||
|
|
||||||
// HasBuckets checks if a bucket has nested buckets. It relies on assumption
|
// HasBuckets checks if a bucket has nested buckets. It relies on assumption
|
||||||
|
@ -137,24 +131,21 @@ func HasBuckets(ctx context.Context, db *bbolt.DB, path [][]byte) (bool, error)
|
||||||
ctx, cancel := context.WithCancel(ctx)
|
ctx, cancel := context.WithCancel(ctx)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
||||||
buffer, err := load(
|
buffer := load(
|
||||||
ctx, db, path, 1,
|
ctx, db, path, 1,
|
||||||
nil,
|
nil,
|
||||||
func(_, value []byte) []byte { return value },
|
func(_, value []byte) []byte { return value },
|
||||||
)
|
)
|
||||||
if err != nil {
|
|
||||||
return false, err
|
|
||||||
}
|
|
||||||
|
|
||||||
x, ok := <-buffer
|
x, ok := <-buffer
|
||||||
if !ok {
|
if !ok {
|
||||||
return false, nil
|
return false, nil
|
||||||
}
|
}
|
||||||
if x.err != nil {
|
if x.err != nil {
|
||||||
return false, err
|
return false, x.err
|
||||||
}
|
}
|
||||||
if x.val != nil {
|
if x.val != nil {
|
||||||
return false, err
|
return false, nil
|
||||||
}
|
}
|
||||||
return true, nil
|
return true, nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -62,10 +62,7 @@ func (v *RecordsView) Mount(ctx context.Context) error {
|
||||||
|
|
||||||
ctx, v.onUnmount = context.WithCancel(ctx)
|
ctx, v.onUnmount = context.WithCancel(ctx)
|
||||||
|
|
||||||
tempBuffer, err := LoadRecords(ctx, v.ui.db, v.bucket.Path, v.ui.loadBufferSize)
|
tempBuffer := LoadRecords(ctx, v.ui.db, v.bucket.Path, v.ui.loadBufferSize)
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
v.buffer = make(chan *Record, v.ui.loadBufferSize)
|
v.buffer = make(chan *Record, v.ui.loadBufferSize)
|
||||||
go func() {
|
go func() {
|
||||||
|
@ -73,11 +70,12 @@ func (v *RecordsView) Mount(ctx context.Context) error {
|
||||||
|
|
||||||
for item := range tempBuffer {
|
for item := range tempBuffer {
|
||||||
if item.err != nil {
|
if item.err != nil {
|
||||||
v.ui.stopOnError(err)
|
v.ui.stopOnError(item.err)
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
record := item.val
|
record := item.val
|
||||||
|
|
||||||
|
var err error
|
||||||
record.Entry, _, err = v.bucket.NextParser(record.Key, record.Value)
|
record.Entry, _, err = v.bucket.NextParser(record.Key, record.Value)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
v.ui.stopOnError(err)
|
v.ui.stopOnError(err)
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
package main
|
package main
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
@ -16,7 +17,7 @@ import (
|
||||||
"github.com/hashicorp/golang-lru/v2/expirable"
|
"github.com/hashicorp/golang-lru/v2/expirable"
|
||||||
)
|
)
|
||||||
|
|
||||||
type netValueReader[K any, V any] func(K) (V, error)
|
type netValueReader[K any, V any] func(ctx context.Context, cid K) (V, error)
|
||||||
|
|
||||||
type valueWithError[V any] struct {
|
type valueWithError[V any] struct {
|
||||||
v V
|
v V
|
||||||
|
@ -49,7 +50,7 @@ func newNetworkTTLCache[K comparable, V any](sz int, ttl time.Duration, netRdr n
|
||||||
// updates the value from the network on cache miss or by TTL.
|
// updates the value from the network on cache miss or by TTL.
|
||||||
//
|
//
|
||||||
// returned value should not be modified.
|
// returned value should not be modified.
|
||||||
func (c *ttlNetCache[K, V]) get(key K) (V, error) {
|
func (c *ttlNetCache[K, V]) get(ctx context.Context, key K) (V, error) {
|
||||||
hit := false
|
hit := false
|
||||||
startedAt := time.Now()
|
startedAt := time.Now()
|
||||||
defer func() {
|
defer func() {
|
||||||
|
@ -71,7 +72,7 @@ func (c *ttlNetCache[K, V]) get(key K) (V, error) {
|
||||||
return val.v, val.e
|
return val.v, val.e
|
||||||
}
|
}
|
||||||
|
|
||||||
v, err := c.netRdr(key)
|
v, err := c.netRdr(ctx, key)
|
||||||
|
|
||||||
c.cache.Add(key, &valueWithError[V]{
|
c.cache.Add(key, &valueWithError[V]{
|
||||||
v: v,
|
v: v,
|
||||||
|
@ -135,7 +136,7 @@ func newNetworkLRUCache(sz int, netRdr netValueReader[uint64, *netmapSDK.NetMap]
|
||||||
// updates the value from the network on cache miss.
|
// updates the value from the network on cache miss.
|
||||||
//
|
//
|
||||||
// returned value should not be modified.
|
// returned value should not be modified.
|
||||||
func (c *lruNetCache) get(key uint64) (*netmapSDK.NetMap, error) {
|
func (c *lruNetCache) get(ctx context.Context, key uint64) (*netmapSDK.NetMap, error) {
|
||||||
hit := false
|
hit := false
|
||||||
startedAt := time.Now()
|
startedAt := time.Now()
|
||||||
defer func() {
|
defer func() {
|
||||||
|
@ -148,7 +149,7 @@ func (c *lruNetCache) get(key uint64) (*netmapSDK.NetMap, error) {
|
||||||
return val, nil
|
return val, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
val, err := c.netRdr(key)
|
val, err := c.netRdr(ctx, key)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -166,11 +167,11 @@ type ttlContainerStorage struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
func newCachedContainerStorage(v container.Source, ttl time.Duration, containerCacheSize uint32) ttlContainerStorage {
|
func newCachedContainerStorage(v container.Source, ttl time.Duration, containerCacheSize uint32) ttlContainerStorage {
|
||||||
lruCnrCache := newNetworkTTLCache(int(containerCacheSize), ttl, func(id cid.ID) (*container.Container, error) {
|
lruCnrCache := newNetworkTTLCache(int(containerCacheSize), ttl, func(ctx context.Context, id cid.ID) (*container.Container, error) {
|
||||||
return v.Get(id)
|
return v.Get(ctx, id)
|
||||||
}, metrics.NewCacheMetrics("container"))
|
}, metrics.NewCacheMetrics("container"))
|
||||||
lruDelInfoCache := newNetworkTTLCache(int(containerCacheSize), ttl, func(id cid.ID) (*container.DelInfo, error) {
|
lruDelInfoCache := newNetworkTTLCache(int(containerCacheSize), ttl, func(ctx context.Context, id cid.ID) (*container.DelInfo, error) {
|
||||||
return v.DeletionInfo(id)
|
return v.DeletionInfo(ctx, id)
|
||||||
}, metrics.NewCacheMetrics("container_deletion_info"))
|
}, metrics.NewCacheMetrics("container_deletion_info"))
|
||||||
|
|
||||||
return ttlContainerStorage{
|
return ttlContainerStorage{
|
||||||
|
@ -188,12 +189,12 @@ func (s ttlContainerStorage) handleRemoval(cnr cid.ID) {
|
||||||
|
|
||||||
// Get returns container value from the cache. If value is missing in the cache
|
// Get returns container value from the cache. If value is missing in the cache
|
||||||
// or expired, then it returns value from side chain and updates the cache.
|
// or expired, then it returns value from side chain and updates the cache.
|
||||||
func (s ttlContainerStorage) Get(cnr cid.ID) (*container.Container, error) {
|
func (s ttlContainerStorage) Get(ctx context.Context, cnr cid.ID) (*container.Container, error) {
|
||||||
return s.containerCache.get(cnr)
|
return s.containerCache.get(ctx, cnr)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s ttlContainerStorage) DeletionInfo(cnr cid.ID) (*container.DelInfo, error) {
|
func (s ttlContainerStorage) DeletionInfo(ctx context.Context, cnr cid.ID) (*container.DelInfo, error) {
|
||||||
return s.delInfoCache.get(cnr)
|
return s.delInfoCache.get(ctx, cnr)
|
||||||
}
|
}
|
||||||
|
|
||||||
type lruNetmapSource struct {
|
type lruNetmapSource struct {
|
||||||
|
@ -205,8 +206,8 @@ type lruNetmapSource struct {
|
||||||
func newCachedNetmapStorage(s netmap.State, v netmap.Source) netmap.Source {
|
func newCachedNetmapStorage(s netmap.State, v netmap.Source) netmap.Source {
|
||||||
const netmapCacheSize = 10
|
const netmapCacheSize = 10
|
||||||
|
|
||||||
lruNetmapCache := newNetworkLRUCache(netmapCacheSize, func(key uint64) (*netmapSDK.NetMap, error) {
|
lruNetmapCache := newNetworkLRUCache(netmapCacheSize, func(ctx context.Context, key uint64) (*netmapSDK.NetMap, error) {
|
||||||
return v.GetNetMapByEpoch(key)
|
return v.GetNetMapByEpoch(ctx, key)
|
||||||
}, metrics.NewCacheMetrics("netmap"))
|
}, metrics.NewCacheMetrics("netmap"))
|
||||||
|
|
||||||
return &lruNetmapSource{
|
return &lruNetmapSource{
|
||||||
|
@ -215,16 +216,16 @@ func newCachedNetmapStorage(s netmap.State, v netmap.Source) netmap.Source {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *lruNetmapSource) GetNetMap(diff uint64) (*netmapSDK.NetMap, error) {
|
func (s *lruNetmapSource) GetNetMap(ctx context.Context, diff uint64) (*netmapSDK.NetMap, error) {
|
||||||
return s.getNetMapByEpoch(s.netState.CurrentEpoch() - diff)
|
return s.getNetMapByEpoch(ctx, s.netState.CurrentEpoch()-diff)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *lruNetmapSource) GetNetMapByEpoch(epoch uint64) (*netmapSDK.NetMap, error) {
|
func (s *lruNetmapSource) GetNetMapByEpoch(ctx context.Context, epoch uint64) (*netmapSDK.NetMap, error) {
|
||||||
return s.getNetMapByEpoch(epoch)
|
return s.getNetMapByEpoch(ctx, epoch)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *lruNetmapSource) getNetMapByEpoch(epoch uint64) (*netmapSDK.NetMap, error) {
|
func (s *lruNetmapSource) getNetMapByEpoch(ctx context.Context, epoch uint64) (*netmapSDK.NetMap, error) {
|
||||||
val, err := s.cache.get(epoch)
|
val, err := s.cache.get(ctx, epoch)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -232,7 +233,7 @@ func (s *lruNetmapSource) getNetMapByEpoch(epoch uint64) (*netmapSDK.NetMap, err
|
||||||
return val, nil
|
return val, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *lruNetmapSource) Epoch() (uint64, error) {
|
func (s *lruNetmapSource) Epoch(_ context.Context) (uint64, error) {
|
||||||
return s.netState.CurrentEpoch(), nil
|
return s.netState.CurrentEpoch(), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -240,7 +241,10 @@ type cachedIRFetcher struct {
|
||||||
*ttlNetCache[struct{}, [][]byte]
|
*ttlNetCache[struct{}, [][]byte]
|
||||||
}
|
}
|
||||||
|
|
||||||
func newCachedIRFetcher(f interface{ InnerRingKeys() ([][]byte, error) }) cachedIRFetcher {
|
func newCachedIRFetcher(f interface {
|
||||||
|
InnerRingKeys(ctx context.Context) ([][]byte, error)
|
||||||
|
},
|
||||||
|
) cachedIRFetcher {
|
||||||
const (
|
const (
|
||||||
irFetcherCacheSize = 1 // we intend to store only one value
|
irFetcherCacheSize = 1 // we intend to store only one value
|
||||||
|
|
||||||
|
@ -254,8 +258,8 @@ func newCachedIRFetcher(f interface{ InnerRingKeys() ([][]byte, error) }) cached
|
||||||
)
|
)
|
||||||
|
|
||||||
irFetcherCache := newNetworkTTLCache(irFetcherCacheSize, irFetcherCacheTTL,
|
irFetcherCache := newNetworkTTLCache(irFetcherCacheSize, irFetcherCacheTTL,
|
||||||
func(_ struct{}) ([][]byte, error) {
|
func(ctx context.Context, _ struct{}) ([][]byte, error) {
|
||||||
return f.InnerRingKeys()
|
return f.InnerRingKeys(ctx)
|
||||||
}, metrics.NewCacheMetrics("ir_keys"),
|
}, metrics.NewCacheMetrics("ir_keys"),
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -265,8 +269,8 @@ func newCachedIRFetcher(f interface{ InnerRingKeys() ([][]byte, error) }) cached
|
||||||
// InnerRingKeys returns cached list of Inner Ring keys. If keys are missing in
|
// InnerRingKeys returns cached list of Inner Ring keys. If keys are missing in
|
||||||
// the cache or expired, then it returns keys from side chain and updates
|
// the cache or expired, then it returns keys from side chain and updates
|
||||||
// the cache.
|
// the cache.
|
||||||
func (f cachedIRFetcher) InnerRingKeys() ([][]byte, error) {
|
func (f cachedIRFetcher) InnerRingKeys(ctx context.Context) ([][]byte, error) {
|
||||||
val, err := f.get(struct{}{})
|
val, err := f.get(ctx, struct{}{})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -289,7 +293,7 @@ func newCachedMaxObjectSizeSource(src objectwriter.MaxSizeSource) objectwriter.M
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *ttlMaxObjectSizeCache) MaxObjectSize() uint64 {
|
func (c *ttlMaxObjectSizeCache) MaxObjectSize(ctx context.Context) uint64 {
|
||||||
const ttl = time.Second * 30
|
const ttl = time.Second * 30
|
||||||
|
|
||||||
hit := false
|
hit := false
|
||||||
|
@ -311,7 +315,7 @@ func (c *ttlMaxObjectSizeCache) MaxObjectSize() uint64 {
|
||||||
c.mtx.Lock()
|
c.mtx.Lock()
|
||||||
size = c.lastSize
|
size = c.lastSize
|
||||||
if !c.lastUpdated.After(prevUpdated) {
|
if !c.lastUpdated.After(prevUpdated) {
|
||||||
size = c.src.MaxObjectSize()
|
size = c.src.MaxObjectSize(ctx)
|
||||||
c.lastSize = size
|
c.lastSize = size
|
||||||
c.lastUpdated = time.Now()
|
c.lastUpdated = time.Now()
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
package main
|
package main
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"errors"
|
"errors"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
@ -17,7 +18,7 @@ func TestTTLNetCache(t *testing.T) {
|
||||||
t.Run("Test Add and Get", func(t *testing.T) {
|
t.Run("Test Add and Get", func(t *testing.T) {
|
||||||
ti := time.Now()
|
ti := time.Now()
|
||||||
cache.set(key, ti, nil)
|
cache.set(key, ti, nil)
|
||||||
val, err := cache.get(key)
|
val, err := cache.get(context.Background(), key)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.Equal(t, ti, val)
|
require.Equal(t, ti, val)
|
||||||
})
|
})
|
||||||
|
@ -26,7 +27,7 @@ func TestTTLNetCache(t *testing.T) {
|
||||||
ti := time.Now()
|
ti := time.Now()
|
||||||
cache.set(key, ti, nil)
|
cache.set(key, ti, nil)
|
||||||
time.Sleep(2 * ttlDuration)
|
time.Sleep(2 * ttlDuration)
|
||||||
val, err := cache.get(key)
|
val, err := cache.get(context.Background(), key)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.NotEqual(t, val, ti)
|
require.NotEqual(t, val, ti)
|
||||||
})
|
})
|
||||||
|
@ -35,20 +36,20 @@ func TestTTLNetCache(t *testing.T) {
|
||||||
ti := time.Now()
|
ti := time.Now()
|
||||||
cache.set(key, ti, nil)
|
cache.set(key, ti, nil)
|
||||||
cache.remove(key)
|
cache.remove(key)
|
||||||
val, err := cache.get(key)
|
val, err := cache.get(context.Background(), key)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.NotEqual(t, val, ti)
|
require.NotEqual(t, val, ti)
|
||||||
})
|
})
|
||||||
|
|
||||||
t.Run("Test Cache Error", func(t *testing.T) {
|
t.Run("Test Cache Error", func(t *testing.T) {
|
||||||
cache.set("error", time.Now(), errors.New("mock error"))
|
cache.set("error", time.Now(), errors.New("mock error"))
|
||||||
_, err := cache.get("error")
|
_, err := cache.get(context.Background(), "error")
|
||||||
require.Error(t, err)
|
require.Error(t, err)
|
||||||
require.Equal(t, "mock error", err.Error())
|
require.Equal(t, "mock error", err.Error())
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func testNetValueReader(key string) (time.Time, error) {
|
func testNetValueReader(_ context.Context, key string) (time.Time, error) {
|
||||||
if key == "error" {
|
if key == "error" {
|
||||||
return time.Now(), errors.New("mock error")
|
return time.Now(), errors.New("mock error")
|
||||||
}
|
}
|
||||||
|
|
|
@ -698,8 +698,7 @@ func initCfg(appCfg *config.Config) *cfg {
|
||||||
|
|
||||||
netState.metrics = c.metricsCollector
|
netState.metrics = c.metricsCollector
|
||||||
|
|
||||||
logPrm, err := c.loggerPrm()
|
logPrm := c.loggerPrm()
|
||||||
fatalOnErr(err)
|
|
||||||
logPrm.SamplingHook = c.metricsCollector.LogMetrics().GetSamplingHook()
|
logPrm.SamplingHook = c.metricsCollector.LogMetrics().GetSamplingHook()
|
||||||
log, err := logger.NewLogger(logPrm)
|
log, err := logger.NewLogger(logPrm)
|
||||||
fatalOnErr(err)
|
fatalOnErr(err)
|
||||||
|
@ -854,7 +853,7 @@ func initFrostfsID(appCfg *config.Config) cfgFrostfsID {
|
||||||
|
|
||||||
func initCfgGRPC() cfgGRPC {
|
func initCfgGRPC() cfgGRPC {
|
||||||
maxChunkSize := uint64(maxMsgSize) * 3 / 4 // 25% to meta, 75% to payload
|
maxChunkSize := uint64(maxMsgSize) * 3 / 4 // 25% to meta, 75% to payload
|
||||||
maxAddrAmount := uint64(maxChunkSize) / addressSize // each address is about 72 bytes
|
maxAddrAmount := maxChunkSize / addressSize // each address is about 72 bytes
|
||||||
|
|
||||||
return cfgGRPC{
|
return cfgGRPC{
|
||||||
maxChunkSize: maxChunkSize,
|
maxChunkSize: maxChunkSize,
|
||||||
|
@ -1059,7 +1058,7 @@ func (c *cfg) getShardOpts(ctx context.Context, shCfg shardCfg) shardOptsWithID
|
||||||
return sh
|
return sh
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *cfg) loggerPrm() (*logger.Prm, error) {
|
func (c *cfg) loggerPrm() *logger.Prm {
|
||||||
// check if it has been inited before
|
// check if it has been inited before
|
||||||
if c.dynamicConfiguration.logger == nil {
|
if c.dynamicConfiguration.logger == nil {
|
||||||
c.dynamicConfiguration.logger = new(logger.Prm)
|
c.dynamicConfiguration.logger = new(logger.Prm)
|
||||||
|
@ -1078,7 +1077,7 @@ func (c *cfg) loggerPrm() (*logger.Prm, error) {
|
||||||
}
|
}
|
||||||
c.dynamicConfiguration.logger.PrependTimestamp = c.LoggerCfg.timestamp
|
c.dynamicConfiguration.logger.PrependTimestamp = c.LoggerCfg.timestamp
|
||||||
|
|
||||||
return c.dynamicConfiguration.logger, nil
|
return c.dynamicConfiguration.logger
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *cfg) LocalAddress() network.AddressGroup {
|
func (c *cfg) LocalAddress() network.AddressGroup {
|
||||||
|
@ -1147,7 +1146,7 @@ func initAccessPolicyEngine(ctx context.Context, c *cfg) {
|
||||||
c.cfgObject.cfgAccessPolicyEngine.policyContractHash)
|
c.cfgObject.cfgAccessPolicyEngine.policyContractHash)
|
||||||
|
|
||||||
cacheSize := morphconfig.APEChainCacheSize(c.appCfg)
|
cacheSize := morphconfig.APEChainCacheSize(c.appCfg)
|
||||||
if cacheSize > 0 {
|
if cacheSize > 0 && c.cfgMorph.cacheTTL > 0 {
|
||||||
morphRuleStorage = newMorphCache(morphRuleStorage, int(cacheSize), c.cfgMorph.cacheTTL)
|
morphRuleStorage = newMorphCache(morphRuleStorage, int(cacheSize), c.cfgMorph.cacheTTL)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1206,7 +1205,7 @@ func (c *cfg) setContractNodeInfo(ni *netmap.NodeInfo) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *cfg) updateContractNodeInfo(ctx context.Context, epoch uint64) {
|
func (c *cfg) updateContractNodeInfo(ctx context.Context, epoch uint64) {
|
||||||
ni, err := c.netmapLocalNodeState(epoch)
|
ni, err := c.netmapLocalNodeState(ctx, epoch)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
c.log.Error(ctx, logs.FrostFSNodeCouldNotUpdateNodeStateOnNewEpoch,
|
c.log.Error(ctx, logs.FrostFSNodeCouldNotUpdateNodeStateOnNewEpoch,
|
||||||
zap.Uint64("epoch", epoch),
|
zap.Uint64("epoch", epoch),
|
||||||
|
@ -1334,11 +1333,7 @@ func (c *cfg) reloadConfig(ctx context.Context) {
|
||||||
|
|
||||||
// Logger
|
// Logger
|
||||||
|
|
||||||
logPrm, err := c.loggerPrm()
|
logPrm := c.loggerPrm()
|
||||||
if err != nil {
|
|
||||||
c.log.Error(ctx, logs.FrostFSNodeLoggerConfigurationPreparation, zap.Error(err))
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
components := c.getComponents(ctx, logPrm)
|
components := c.getComponents(ctx, logPrm)
|
||||||
|
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
package config
|
package config
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"slices"
|
||||||
"strings"
|
"strings"
|
||||||
|
|
||||||
configViper "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/internal/common/config"
|
configViper "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/internal/common/config"
|
||||||
|
@ -52,6 +53,5 @@ func (x *Config) Value(name string) any {
|
||||||
// It supports only one level of nesting and is intended to be used
|
// It supports only one level of nesting and is intended to be used
|
||||||
// to provide default values.
|
// to provide default values.
|
||||||
func (x *Config) SetDefault(from *Config) {
|
func (x *Config) SetDefault(from *Config) {
|
||||||
x.defaultPath = make([]string, len(from.path))
|
x.defaultPath = slices.Clone(from.path)
|
||||||
copy(x.defaultPath, from.path)
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -198,7 +198,7 @@ func (l PersistentPolicyRulesConfig) Path() string {
|
||||||
//
|
//
|
||||||
// Returns PermDefault if the value is not a positive number.
|
// Returns PermDefault if the value is not a positive number.
|
||||||
func (l PersistentPolicyRulesConfig) Perm() fs.FileMode {
|
func (l PersistentPolicyRulesConfig) Perm() fs.FileMode {
|
||||||
p := config.UintSafe((*config.Config)(l.cfg), "perm")
|
p := config.UintSafe(l.cfg, "perm")
|
||||||
if p == 0 {
|
if p == 0 {
|
||||||
p = PermDefault
|
p = PermDefault
|
||||||
}
|
}
|
||||||
|
@ -210,7 +210,7 @@ func (l PersistentPolicyRulesConfig) Perm() fs.FileMode {
|
||||||
//
|
//
|
||||||
// Returns false if the value is not a boolean.
|
// Returns false if the value is not a boolean.
|
||||||
func (l PersistentPolicyRulesConfig) NoSync() bool {
|
func (l PersistentPolicyRulesConfig) NoSync() bool {
|
||||||
return config.BoolSafe((*config.Config)(l.cfg), "no_sync")
|
return config.BoolSafe(l.cfg, "no_sync")
|
||||||
}
|
}
|
||||||
|
|
||||||
// CompatibilityMode returns true if need to run node in compatibility with previous versions mode.
|
// CompatibilityMode returns true if need to run node in compatibility with previous versions mode.
|
||||||
|
|
|
@ -43,7 +43,7 @@ func initContainerService(_ context.Context, c *cfg) {
|
||||||
fatalOnErr(err)
|
fatalOnErr(err)
|
||||||
|
|
||||||
cacheSize := morphconfig.FrostfsIDCacheSize(c.appCfg)
|
cacheSize := morphconfig.FrostfsIDCacheSize(c.appCfg)
|
||||||
if cacheSize > 0 {
|
if cacheSize > 0 && c.cfgMorph.cacheTTL > 0 {
|
||||||
frostfsIDSubjectProvider = newMorphFrostfsIDCache(frostfsIDSubjectProvider, int(cacheSize), c.cfgMorph.cacheTTL, metrics.NewCacheMetrics("frostfs_id"))
|
frostfsIDSubjectProvider = newMorphFrostfsIDCache(frostfsIDSubjectProvider, int(cacheSize), c.cfgMorph.cacheTTL, metrics.NewCacheMetrics("frostfs_id"))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -100,7 +100,7 @@ func configureEACLAndContainerSources(c *cfg, client *cntClient.Client, cnrSrc c
|
||||||
// TODO: use owner directly from the event after neofs-contract#256 will become resolved
|
// TODO: use owner directly from the event after neofs-contract#256 will become resolved
|
||||||
// but don't forget about the profit of reading the new container and caching it:
|
// but don't forget about the profit of reading the new container and caching it:
|
||||||
// creation success are most commonly tracked by polling GET op.
|
// creation success are most commonly tracked by polling GET op.
|
||||||
cnr, err := cnrSrc.Get(ev.ID)
|
cnr, err := cnrSrc.Get(ctx, ev.ID)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
containerCache.containerCache.set(ev.ID, cnr, nil)
|
containerCache.containerCache.set(ev.ID, cnr, nil)
|
||||||
} else {
|
} else {
|
||||||
|
@ -221,25 +221,25 @@ type morphContainerReader struct {
|
||||||
src containerCore.Source
|
src containerCore.Source
|
||||||
|
|
||||||
lister interface {
|
lister interface {
|
||||||
ContainersOf(*user.ID) ([]cid.ID, error)
|
ContainersOf(context.Context, *user.ID) ([]cid.ID, error)
|
||||||
IterateContainersOf(*user.ID, func(cid.ID) error) error
|
IterateContainersOf(context.Context, *user.ID, func(cid.ID) error) error
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (x *morphContainerReader) Get(id cid.ID) (*containerCore.Container, error) {
|
func (x *morphContainerReader) Get(ctx context.Context, id cid.ID) (*containerCore.Container, error) {
|
||||||
return x.src.Get(id)
|
return x.src.Get(ctx, id)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (x *morphContainerReader) DeletionInfo(id cid.ID) (*containerCore.DelInfo, error) {
|
func (x *morphContainerReader) DeletionInfo(ctx context.Context, id cid.ID) (*containerCore.DelInfo, error) {
|
||||||
return x.src.DeletionInfo(id)
|
return x.src.DeletionInfo(ctx, id)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (x *morphContainerReader) ContainersOf(id *user.ID) ([]cid.ID, error) {
|
func (x *morphContainerReader) ContainersOf(ctx context.Context, id *user.ID) ([]cid.ID, error) {
|
||||||
return x.lister.ContainersOf(id)
|
return x.lister.ContainersOf(ctx, id)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (x *morphContainerReader) IterateContainersOf(id *user.ID, processCID func(cid.ID) error) error {
|
func (x *morphContainerReader) IterateContainersOf(ctx context.Context, id *user.ID, processCID func(cid.ID) error) error {
|
||||||
return x.lister.IterateContainersOf(id, processCID)
|
return x.lister.IterateContainersOf(ctx, id, processCID)
|
||||||
}
|
}
|
||||||
|
|
||||||
type morphContainerWriter struct {
|
type morphContainerWriter struct {
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
package main
|
package main
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
@ -42,7 +43,7 @@ func newMorphFrostfsIDCache(subjProvider frostfsidcore.SubjectProvider, size int
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *morphFrostfsIDCache) GetSubject(addr util.Uint160) (*client.Subject, error) {
|
func (m *morphFrostfsIDCache) GetSubject(ctx context.Context, addr util.Uint160) (*client.Subject, error) {
|
||||||
hit := false
|
hit := false
|
||||||
startedAt := time.Now()
|
startedAt := time.Now()
|
||||||
defer func() {
|
defer func() {
|
||||||
|
@ -55,7 +56,7 @@ func (m *morphFrostfsIDCache) GetSubject(addr util.Uint160) (*client.Subject, er
|
||||||
return result.subject, result.err
|
return result.subject, result.err
|
||||||
}
|
}
|
||||||
|
|
||||||
subj, err := m.subjProvider.GetSubject(addr)
|
subj, err := m.subjProvider.GetSubject(ctx, addr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if m.isCacheableError(err) {
|
if m.isCacheableError(err) {
|
||||||
m.subjCache.Add(addr, subjectWithError{
|
m.subjCache.Add(addr, subjectWithError{
|
||||||
|
@ -69,7 +70,7 @@ func (m *morphFrostfsIDCache) GetSubject(addr util.Uint160) (*client.Subject, er
|
||||||
return subj, nil
|
return subj, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *morphFrostfsIDCache) GetSubjectExtended(addr util.Uint160) (*client.SubjectExtended, error) {
|
func (m *morphFrostfsIDCache) GetSubjectExtended(ctx context.Context, addr util.Uint160) (*client.SubjectExtended, error) {
|
||||||
hit := false
|
hit := false
|
||||||
startedAt := time.Now()
|
startedAt := time.Now()
|
||||||
defer func() {
|
defer func() {
|
||||||
|
@ -82,7 +83,7 @@ func (m *morphFrostfsIDCache) GetSubjectExtended(addr util.Uint160) (*client.Sub
|
||||||
return result.subject, result.err
|
return result.subject, result.err
|
||||||
}
|
}
|
||||||
|
|
||||||
subjExt, err := m.subjProvider.GetSubjectExtended(addr)
|
subjExt, err := m.subjProvider.GetSubjectExtended(ctx, addr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if m.isCacheableError(err) {
|
if m.isCacheableError(err) {
|
||||||
m.subjExtCache.Add(addr, subjectExtWithError{
|
m.subjExtCache.Add(addr, subjectExtWithError{
|
||||||
|
|
|
@ -86,7 +86,7 @@ func (s *networkState) setNodeInfo(ni *netmapSDK.NodeInfo) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
s.setControlNetmapStatus(control.NetmapStatus(ctrlNetSt))
|
s.setControlNetmapStatus(ctrlNetSt)
|
||||||
}
|
}
|
||||||
|
|
||||||
// sets the current node state to the given value. Subsequent cfg.bootstrap
|
// sets the current node state to the given value. Subsequent cfg.bootstrap
|
||||||
|
@ -239,7 +239,7 @@ func setNetmapNotificationParser(c *cfg, sTyp string, p event.NotificationParser
|
||||||
// initNetmapState inits current Network map state.
|
// initNetmapState inits current Network map state.
|
||||||
// Must be called after Morph components initialization.
|
// Must be called after Morph components initialization.
|
||||||
func initNetmapState(ctx context.Context, c *cfg) {
|
func initNetmapState(ctx context.Context, c *cfg) {
|
||||||
epoch, err := c.cfgNetmap.wrapper.Epoch()
|
epoch, err := c.cfgNetmap.wrapper.Epoch(ctx)
|
||||||
fatalOnErrDetails("could not initialize current epoch number", err)
|
fatalOnErrDetails("could not initialize current epoch number", err)
|
||||||
|
|
||||||
var ni *netmapSDK.NodeInfo
|
var ni *netmapSDK.NodeInfo
|
||||||
|
@ -278,7 +278,7 @@ func nodeState(ni *netmapSDK.NodeInfo) string {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *cfg) netmapInitLocalNodeState(ctx context.Context, epoch uint64) (*netmapSDK.NodeInfo, error) {
|
func (c *cfg) netmapInitLocalNodeState(ctx context.Context, epoch uint64) (*netmapSDK.NodeInfo, error) {
|
||||||
nmNodes, err := c.cfgNetmap.wrapper.GetCandidates()
|
nmNodes, err := c.cfgNetmap.wrapper.GetCandidates(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -291,7 +291,7 @@ func (c *cfg) netmapInitLocalNodeState(ctx context.Context, epoch uint64) (*netm
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
node, err := c.netmapLocalNodeState(epoch)
|
node, err := c.netmapLocalNodeState(ctx, epoch)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -312,9 +312,9 @@ func (c *cfg) netmapInitLocalNodeState(ctx context.Context, epoch uint64) (*netm
|
||||||
return candidate, nil
|
return candidate, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *cfg) netmapLocalNodeState(epoch uint64) (*netmapSDK.NodeInfo, error) {
|
func (c *cfg) netmapLocalNodeState(ctx context.Context, epoch uint64) (*netmapSDK.NodeInfo, error) {
|
||||||
// calculate current network state
|
// calculate current network state
|
||||||
nm, err := c.cfgNetmap.wrapper.GetNetMapByEpoch(epoch)
|
nm, err := c.cfgNetmap.wrapper.GetNetMapByEpoch(ctx, epoch)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -376,8 +376,8 @@ func (c *cfg) SetNetmapStatus(ctx context.Context, st control.NetmapStatus) erro
|
||||||
return c.updateNetMapState(ctx, func(*nmClient.UpdatePeerPrm) {})
|
return c.updateNetMapState(ctx, func(*nmClient.UpdatePeerPrm) {})
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *cfg) GetNetmapStatus() (control.NetmapStatus, uint64, error) {
|
func (c *cfg) GetNetmapStatus(ctx context.Context) (control.NetmapStatus, uint64, error) {
|
||||||
epoch, err := c.netMapSource.Epoch()
|
epoch, err := c.netMapSource.Epoch(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return control.NetmapStatus_STATUS_UNDEFINED, 0, fmt.Errorf("failed to get current epoch: %w", err)
|
return control.NetmapStatus_STATUS_UNDEFINED, 0, fmt.Errorf("failed to get current epoch: %w", err)
|
||||||
}
|
}
|
||||||
|
@ -390,7 +390,7 @@ func (c *cfg) ForceMaintenance(ctx context.Context) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *cfg) setMaintenanceStatus(ctx context.Context, force bool) error {
|
func (c *cfg) setMaintenanceStatus(ctx context.Context, force bool) error {
|
||||||
netSettings, err := c.cfgNetmap.wrapper.ReadNetworkConfiguration()
|
netSettings, err := c.cfgNetmap.wrapper.ReadNetworkConfiguration(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
err = fmt.Errorf("read network settings to check maintenance allowance: %w", err)
|
err = fmt.Errorf("read network settings to check maintenance allowance: %w", err)
|
||||||
} else if !netSettings.MaintenanceModeAllowed {
|
} else if !netSettings.MaintenanceModeAllowed {
|
||||||
|
@ -438,7 +438,7 @@ type netInfo struct {
|
||||||
msPerBlockRdr func() (int64, error)
|
msPerBlockRdr func() (int64, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (n *netInfo) Dump(ver version.Version) (*netmapSDK.NetworkInfo, error) {
|
func (n *netInfo) Dump(ctx context.Context, ver version.Version) (*netmapSDK.NetworkInfo, error) {
|
||||||
magic, err := n.magic.MagicNumber()
|
magic, err := n.magic.MagicNumber()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
@ -448,7 +448,7 @@ func (n *netInfo) Dump(ver version.Version) (*netmapSDK.NetworkInfo, error) {
|
||||||
ni.SetCurrentEpoch(n.netState.CurrentEpoch())
|
ni.SetCurrentEpoch(n.netState.CurrentEpoch())
|
||||||
ni.SetMagicNumber(magic)
|
ni.SetMagicNumber(magic)
|
||||||
|
|
||||||
netInfoMorph, err := n.morphClientNetMap.ReadNetworkConfiguration()
|
netInfoMorph, err := n.morphClientNetMap.ReadNetworkConfiguration(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("read network configuration using netmap contract client: %w", err)
|
return nil, fmt.Errorf("read network configuration using netmap contract client: %w", err)
|
||||||
}
|
}
|
||||||
|
|
|
@ -54,10 +54,10 @@ type objectSvc struct {
|
||||||
patch *patchsvc.Service
|
patch *patchsvc.Service
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *cfg) MaxObjectSize() uint64 {
|
func (c *cfg) MaxObjectSize(ctx context.Context) uint64 {
|
||||||
sz, err := c.cfgNetmap.wrapper.MaxObjectSize()
|
sz, err := c.cfgNetmap.wrapper.MaxObjectSize(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
c.log.Error(context.Background(), logs.FrostFSNodeCouldNotGetMaxObjectSizeValue,
|
c.log.Error(ctx, logs.FrostFSNodeCouldNotGetMaxObjectSizeValue,
|
||||||
zap.Error(err),
|
zap.Error(err),
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
@ -122,8 +122,8 @@ type innerRingFetcherWithNotary struct {
|
||||||
sidechain *morphClient.Client
|
sidechain *morphClient.Client
|
||||||
}
|
}
|
||||||
|
|
||||||
func (fn *innerRingFetcherWithNotary) InnerRingKeys() ([][]byte, error) {
|
func (fn *innerRingFetcherWithNotary) InnerRingKeys(ctx context.Context) ([][]byte, error) {
|
||||||
keys, err := fn.sidechain.NeoFSAlphabetList()
|
keys, err := fn.sidechain.NeoFSAlphabetList(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("can't get inner ring keys from alphabet role: %w", err)
|
return nil, fmt.Errorf("can't get inner ring keys from alphabet role: %w", err)
|
||||||
}
|
}
|
||||||
|
@ -215,8 +215,7 @@ func addPolicer(c *cfg, keyStorage *util.KeyStorage, clientConstructor *cache.Cl
|
||||||
prm.MarkAsGarbage(addr)
|
prm.MarkAsGarbage(addr)
|
||||||
prm.WithForceRemoval()
|
prm.WithForceRemoval()
|
||||||
|
|
||||||
_, err := ls.Inhume(ctx, prm)
|
return ls.Inhume(ctx, prm)
|
||||||
return err
|
|
||||||
}
|
}
|
||||||
|
|
||||||
remoteReader := objectService.NewRemoteReader(keyStorage, clientConstructor)
|
remoteReader := objectService.NewRemoteReader(keyStorage, clientConstructor)
|
||||||
|
@ -266,8 +265,7 @@ func addPolicer(c *cfg, keyStorage *util.KeyStorage, clientConstructor *cache.Cl
|
||||||
var inhumePrm engine.InhumePrm
|
var inhumePrm engine.InhumePrm
|
||||||
inhumePrm.MarkAsGarbage(addr)
|
inhumePrm.MarkAsGarbage(addr)
|
||||||
|
|
||||||
_, err := ls.Inhume(ctx, inhumePrm)
|
if err := ls.Inhume(ctx, inhumePrm); err != nil {
|
||||||
if err != nil {
|
|
||||||
c.log.Warn(ctx, logs.FrostFSNodeCouldNotInhumeMarkRedundantCopyAsGarbage,
|
c.log.Warn(ctx, logs.FrostFSNodeCouldNotInhumeMarkRedundantCopyAsGarbage,
|
||||||
zap.Error(err),
|
zap.Error(err),
|
||||||
)
|
)
|
||||||
|
@ -476,8 +474,7 @@ func (e engineWithoutNotifications) Delete(ctx context.Context, tombstone oid.Ad
|
||||||
|
|
||||||
prm.WithTarget(tombstone, addrs...)
|
prm.WithTarget(tombstone, addrs...)
|
||||||
|
|
||||||
_, err := e.engine.Inhume(ctx, prm)
|
return e.engine.Inhume(ctx, prm)
|
||||||
return err
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e engineWithoutNotifications) Lock(ctx context.Context, locker oid.Address, toLock []oid.ID) error {
|
func (e engineWithoutNotifications) Lock(ctx context.Context, locker oid.Address, toLock []oid.ID) error {
|
||||||
|
|
|
@ -29,16 +29,16 @@ type cnrSource struct {
|
||||||
cli *containerClient.Client
|
cli *containerClient.Client
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c cnrSource) Get(id cid.ID) (*container.Container, error) {
|
func (c cnrSource) Get(ctx context.Context, id cid.ID) (*container.Container, error) {
|
||||||
return c.src.Get(id)
|
return c.src.Get(ctx, id)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c cnrSource) DeletionInfo(cid cid.ID) (*container.DelInfo, error) {
|
func (c cnrSource) DeletionInfo(ctx context.Context, cid cid.ID) (*container.DelInfo, error) {
|
||||||
return c.src.DeletionInfo(cid)
|
return c.src.DeletionInfo(ctx, cid)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c cnrSource) List() ([]cid.ID, error) {
|
func (c cnrSource) List(ctx context.Context) ([]cid.ID, error) {
|
||||||
return c.cli.ContainersOf(nil)
|
return c.cli.ContainersOf(ctx, nil)
|
||||||
}
|
}
|
||||||
|
|
||||||
func initTreeService(c *cfg) {
|
func initTreeService(c *cfg) {
|
||||||
|
|
|
@ -79,7 +79,8 @@ contracts: # side chain NEOFS contract script hashes; optional, override values
|
||||||
|
|
||||||
morph:
|
morph:
|
||||||
dial_timeout: 30s # timeout for side chain NEO RPC client connection
|
dial_timeout: 30s # timeout for side chain NEO RPC client connection
|
||||||
cache_ttl: 15s # Sidechain cache TTL value (min interval between similar calls). Negative value disables caching.
|
cache_ttl: 15s # Sidechain cache TTL value (min interval between similar calls).
|
||||||
|
# Negative value disables caching. A zero value sets the default value.
|
||||||
# Default value: block time. It is recommended to have this value less or equal to block time.
|
# Default value: block time. It is recommended to have this value less or equal to block time.
|
||||||
# Cached entities: containers, container lists, eACL tables.
|
# Cached entities: containers, container lists, eACL tables.
|
||||||
container_cache_size: 100 # container_cache_size is is the maximum number of containers in the cache.
|
container_cache_size: 100 # container_cache_size is is the maximum number of containers in the cache.
|
||||||
|
|
|
@ -95,19 +95,15 @@ $ git push origin ${FROSTFS_TAG_PREFIX}${FROSTFS_REVISION}
|
||||||
|
|
||||||
## Post-release
|
## Post-release
|
||||||
|
|
||||||
### Prepare and push images to a Docker Hub (if not automated)
|
### Prepare and push images to a Docker registry (automated)
|
||||||
|
|
||||||
Create Docker images for all applications and push them into Docker Hub
|
Create Docker images for all applications and push them into container registry
|
||||||
(requires [organization](https://hub.docker.com/u/truecloudlab) privileges)
|
(executed automatically in Forgejo Actions upon pushing a release tag):
|
||||||
|
|
||||||
```shell
|
```shell
|
||||||
$ git checkout ${FROSTFS_TAG_PREFIX}${FROSTFS_REVISION}
|
$ git checkout ${FROSTFS_TAG_PREFIX}${FROSTFS_REVISION}
|
||||||
$ make images
|
$ make images
|
||||||
$ docker push truecloudlab/frostfs-storage:${FROSTFS_REVISION}
|
$ make push-images
|
||||||
$ docker push truecloudlab/frostfs-storage-testnet:${FROSTFS_REVISION}
|
|
||||||
$ docker push truecloudlab/frostfs-ir:${FROSTFS_REVISION}
|
|
||||||
$ docker push truecloudlab/frostfs-cli:${FROSTFS_REVISION}
|
|
||||||
$ docker push truecloudlab/frostfs-adm:${FROSTFS_REVISION}
|
|
||||||
```
|
```
|
||||||
|
|
||||||
### Make a proper release (if not automated)
|
### Make a proper release (if not automated)
|
||||||
|
|
2
go.mod
2
go.mod
|
@ -8,7 +8,7 @@ require (
|
||||||
git.frostfs.info/TrueCloudLab/frostfs-crypto v0.6.0
|
git.frostfs.info/TrueCloudLab/frostfs-crypto v0.6.0
|
||||||
git.frostfs.info/TrueCloudLab/frostfs-locode-db v0.4.1-0.20240710074952-65761deb5c0d
|
git.frostfs.info/TrueCloudLab/frostfs-locode-db v0.4.1-0.20240710074952-65761deb5c0d
|
||||||
git.frostfs.info/TrueCloudLab/frostfs-observability v0.0.0-20241112082307-f17779933e88
|
git.frostfs.info/TrueCloudLab/frostfs-observability v0.0.0-20241112082307-f17779933e88
|
||||||
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20250109084609-328d214d2d76
|
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20250202151421-8389887a3421
|
||||||
git.frostfs.info/TrueCloudLab/hrw v1.2.1
|
git.frostfs.info/TrueCloudLab/hrw v1.2.1
|
||||||
git.frostfs.info/TrueCloudLab/multinet v0.0.0-20241015075604-6cb0d80e0972
|
git.frostfs.info/TrueCloudLab/multinet v0.0.0-20241015075604-6cb0d80e0972
|
||||||
git.frostfs.info/TrueCloudLab/policy-engine v0.0.0-20240814080254-96225afacb88
|
git.frostfs.info/TrueCloudLab/policy-engine v0.0.0-20240814080254-96225afacb88
|
||||||
|
|
4
go.sum
4
go.sum
|
@ -8,8 +8,8 @@ git.frostfs.info/TrueCloudLab/frostfs-locode-db v0.4.1-0.20240710074952-65761deb
|
||||||
git.frostfs.info/TrueCloudLab/frostfs-locode-db v0.4.1-0.20240710074952-65761deb5c0d/go.mod h1:7ZZq8iguY7qFsXajdHGmZd2AW4QbucyrJwhbsRfOfek=
|
git.frostfs.info/TrueCloudLab/frostfs-locode-db v0.4.1-0.20240710074952-65761deb5c0d/go.mod h1:7ZZq8iguY7qFsXajdHGmZd2AW4QbucyrJwhbsRfOfek=
|
||||||
git.frostfs.info/TrueCloudLab/frostfs-observability v0.0.0-20241112082307-f17779933e88 h1:9bvBDLApbbO5sXBKdODpE9tzy3HV99nXxkDWNn22rdI=
|
git.frostfs.info/TrueCloudLab/frostfs-observability v0.0.0-20241112082307-f17779933e88 h1:9bvBDLApbbO5sXBKdODpE9tzy3HV99nXxkDWNn22rdI=
|
||||||
git.frostfs.info/TrueCloudLab/frostfs-observability v0.0.0-20241112082307-f17779933e88/go.mod h1:kbwB4v2o6RyOfCo9kEFeUDZIX3LKhmS0yXPrtvzkQ1g=
|
git.frostfs.info/TrueCloudLab/frostfs-observability v0.0.0-20241112082307-f17779933e88/go.mod h1:kbwB4v2o6RyOfCo9kEFeUDZIX3LKhmS0yXPrtvzkQ1g=
|
||||||
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20250109084609-328d214d2d76 h1:wzvSJIiS+p9qKfl3eg1oH6qlrjaEWiqTc/iMDKG3Ml4=
|
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20250202151421-8389887a3421 h1:pP19IawSdsLCKFv7HMNfWAeH6E3uSnntKZkwka+/2+4=
|
||||||
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20250109084609-328d214d2d76/go.mod h1:aQpPWfG8oyfJ2X+FenPTJpSRWZjwcP5/RAtkW+/VEX8=
|
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20250202151421-8389887a3421/go.mod h1:aQpPWfG8oyfJ2X+FenPTJpSRWZjwcP5/RAtkW+/VEX8=
|
||||||
git.frostfs.info/TrueCloudLab/hrw v1.2.1 h1:ccBRK21rFvY5R1WotI6LNoPlizk7qSvdfD8lNIRudVc=
|
git.frostfs.info/TrueCloudLab/hrw v1.2.1 h1:ccBRK21rFvY5R1WotI6LNoPlizk7qSvdfD8lNIRudVc=
|
||||||
git.frostfs.info/TrueCloudLab/hrw v1.2.1/go.mod h1:C1Ygde2n843yTZEQ0FP69jYiuaYV0kriLvP4zm8JuvM=
|
git.frostfs.info/TrueCloudLab/hrw v1.2.1/go.mod h1:C1Ygde2n843yTZEQ0FP69jYiuaYV0kriLvP4zm8JuvM=
|
||||||
git.frostfs.info/TrueCloudLab/multinet v0.0.0-20241015075604-6cb0d80e0972 h1:/960fWeyn2AFHwQUwDsWB3sbP6lTEnFnMzLMM6tx6N8=
|
git.frostfs.info/TrueCloudLab/multinet v0.0.0-20241015075604-6cb0d80e0972 h1:/960fWeyn2AFHwQUwDsWB3sbP6lTEnFnMzLMM6tx6N8=
|
||||||
|
|
|
@ -146,7 +146,6 @@ const (
|
||||||
ClientCantGetBlockchainHeight = "can't get blockchain height"
|
ClientCantGetBlockchainHeight = "can't get blockchain height"
|
||||||
ClientCantGetBlockchainHeight243 = "can't get blockchain height"
|
ClientCantGetBlockchainHeight243 = "can't get blockchain height"
|
||||||
EventCouldNotSubmitHandlerToWorkerPool = "could not Submit handler to worker pool"
|
EventCouldNotSubmitHandlerToWorkerPool = "could not Submit handler to worker pool"
|
||||||
EventCouldNotStartListenToEvents = "could not start listen to events"
|
|
||||||
EventStopEventListenerByError = "stop event listener by error"
|
EventStopEventListenerByError = "stop event listener by error"
|
||||||
EventStopEventListenerByContext = "stop event listener by context"
|
EventStopEventListenerByContext = "stop event listener by context"
|
||||||
EventStopEventListenerByNotificationChannel = "stop event listener by notification channel"
|
EventStopEventListenerByNotificationChannel = "stop event listener by notification channel"
|
||||||
|
@ -384,7 +383,6 @@ const (
|
||||||
FrostFSNodeShutdownSkip = "node is already shutting down, skipped shutdown"
|
FrostFSNodeShutdownSkip = "node is already shutting down, skipped shutdown"
|
||||||
FrostFSNodeShutdownWhenNotReady = "node is going to shut down when subsystems are still initializing"
|
FrostFSNodeShutdownWhenNotReady = "node is going to shut down when subsystems are still initializing"
|
||||||
FrostFSNodeConfigurationReading = "configuration reading"
|
FrostFSNodeConfigurationReading = "configuration reading"
|
||||||
FrostFSNodeLoggerConfigurationPreparation = "logger configuration preparation"
|
|
||||||
FrostFSNodeTracingConfigationUpdated = "tracing configation updated"
|
FrostFSNodeTracingConfigationUpdated = "tracing configation updated"
|
||||||
FrostFSNodeStorageEngineConfigurationUpdate = "storage engine configuration update"
|
FrostFSNodeStorageEngineConfigurationUpdate = "storage engine configuration update"
|
||||||
FrostFSNodePoolConfigurationUpdate = "adjust pool configuration"
|
FrostFSNodePoolConfigurationUpdate = "adjust pool configuration"
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
package request
|
package request
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
|
@ -12,9 +13,9 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
// FormFrostfsIDRequestProperties forms frostfsid specific request properties like user-claim tags and group ID.
|
// FormFrostfsIDRequestProperties forms frostfsid specific request properties like user-claim tags and group ID.
|
||||||
func FormFrostfsIDRequestProperties(frostFSIDClient frostfsidcore.SubjectProvider, pk *keys.PublicKey) (map[string]string, error) {
|
func FormFrostfsIDRequestProperties(ctx context.Context, frostFSIDClient frostfsidcore.SubjectProvider, pk *keys.PublicKey) (map[string]string, error) {
|
||||||
reqProps := make(map[string]string)
|
reqProps := make(map[string]string)
|
||||||
subj, err := frostFSIDClient.GetSubjectExtended(pk.GetScriptHash())
|
subj, err := frostFSIDClient.GetSubjectExtended(ctx, pk.GetScriptHash())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if !strings.Contains(err.Error(), frostfsidcore.SubjectNotFoundErrorMessage) {
|
if !strings.Contains(err.Error(), frostfsidcore.SubjectNotFoundErrorMessage) {
|
||||||
return nil, fmt.Errorf("get subject error: %w", err)
|
return nil, fmt.Errorf("get subject error: %w", err)
|
||||||
|
@ -36,8 +37,8 @@ func FormFrostfsIDRequestProperties(frostFSIDClient frostfsidcore.SubjectProvide
|
||||||
}
|
}
|
||||||
|
|
||||||
// Groups return the actor's group ids from frostfsid contract.
|
// Groups return the actor's group ids from frostfsid contract.
|
||||||
func Groups(frostFSIDClient frostfsidcore.SubjectProvider, pk *keys.PublicKey) ([]string, error) {
|
func Groups(ctx context.Context, frostFSIDClient frostfsidcore.SubjectProvider, pk *keys.PublicKey) ([]string, error) {
|
||||||
subj, err := frostFSIDClient.GetSubjectExtended(pk.GetScriptHash())
|
subj, err := frostFSIDClient.GetSubjectExtended(ctx, pk.GetScriptHash())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if !strings.Contains(err.Error(), frostfsidcore.SubjectNotFoundErrorMessage) {
|
if !strings.Contains(err.Error(), frostfsidcore.SubjectNotFoundErrorMessage) {
|
||||||
return nil, fmt.Errorf("get subject error: %w", err)
|
return nil, fmt.Errorf("get subject error: %w", err)
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
package container
|
package container
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
utilSync "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/sync"
|
utilSync "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/sync"
|
||||||
|
@ -19,7 +20,7 @@ type infoValue struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
type InfoProvider interface {
|
type InfoProvider interface {
|
||||||
Info(id cid.ID) (Info, error)
|
Info(ctx context.Context, id cid.ID) (Info, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
type infoProvider struct {
|
type infoProvider struct {
|
||||||
|
@ -43,13 +44,13 @@ func NewInfoProvider(sourceFactory func() (Source, error)) InfoProvider {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *infoProvider) Info(id cid.ID) (Info, error) {
|
func (r *infoProvider) Info(ctx context.Context, id cid.ID) (Info, error) {
|
||||||
v, found := r.tryGetFromCache(id)
|
v, found := r.tryGetFromCache(id)
|
||||||
if found {
|
if found {
|
||||||
return v.info, v.err
|
return v.info, v.err
|
||||||
}
|
}
|
||||||
|
|
||||||
return r.getFromSource(id)
|
return r.getFromSource(ctx, id)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *infoProvider) tryGetFromCache(id cid.ID) (infoValue, bool) {
|
func (r *infoProvider) tryGetFromCache(id cid.ID) (infoValue, bool) {
|
||||||
|
@ -60,7 +61,7 @@ func (r *infoProvider) tryGetFromCache(id cid.ID) (infoValue, bool) {
|
||||||
return value, found
|
return value, found
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *infoProvider) getFromSource(id cid.ID) (Info, error) {
|
func (r *infoProvider) getFromSource(ctx context.Context, id cid.ID) (Info, error) {
|
||||||
r.kl.Lock(id)
|
r.kl.Lock(id)
|
||||||
defer r.kl.Unlock(id)
|
defer r.kl.Unlock(id)
|
||||||
|
|
||||||
|
@ -75,11 +76,11 @@ func (r *infoProvider) getFromSource(id cid.ID) (Info, error) {
|
||||||
return Info{}, r.sourceErr
|
return Info{}, r.sourceErr
|
||||||
}
|
}
|
||||||
|
|
||||||
cnr, err := r.source.Get(id)
|
cnr, err := r.source.Get(ctx, id)
|
||||||
var civ infoValue
|
var civ infoValue
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if client.IsErrContainerNotFound(err) {
|
if client.IsErrContainerNotFound(err) {
|
||||||
removed, err := WasRemoved(r.source, id)
|
removed, err := WasRemoved(ctx, r.source, id)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
civ.err = err
|
civ.err = err
|
||||||
} else {
|
} else {
|
||||||
|
|
|
@ -1,6 +1,8 @@
|
||||||
package container
|
package container
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
|
||||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||||
frostfscrypto "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/crypto"
|
frostfscrypto "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/crypto"
|
||||||
|
@ -41,9 +43,9 @@ type Source interface {
|
||||||
//
|
//
|
||||||
// Implementations must not retain the container pointer and modify
|
// Implementations must not retain the container pointer and modify
|
||||||
// the container through it.
|
// the container through it.
|
||||||
Get(cid.ID) (*Container, error)
|
Get(ctx context.Context, cid cid.ID) (*Container, error)
|
||||||
|
|
||||||
DeletionInfo(cid.ID) (*DelInfo, error)
|
DeletionInfo(ctx context.Context, cid cid.ID) (*DelInfo, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
// EACL groups information about the FrostFS container's extended ACL stored in
|
// EACL groups information about the FrostFS container's extended ACL stored in
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
package container
|
package container
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"errors"
|
"errors"
|
||||||
|
|
||||||
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
||||||
|
@ -10,8 +11,8 @@ import (
|
||||||
|
|
||||||
// WasRemoved checks whether the container ever existed or
|
// WasRemoved checks whether the container ever existed or
|
||||||
// it just has not been created yet at the current epoch.
|
// it just has not been created yet at the current epoch.
|
||||||
func WasRemoved(s Source, cid cid.ID) (bool, error) {
|
func WasRemoved(ctx context.Context, s Source, cid cid.ID) (bool, error) {
|
||||||
_, err := s.DeletionInfo(cid)
|
_, err := s.DeletionInfo(ctx, cid)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
return true, nil
|
return true, nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,6 +1,8 @@
|
||||||
package frostfsid
|
package frostfsid
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-contract/frostfsid/client"
|
"git.frostfs.info/TrueCloudLab/frostfs-contract/frostfsid/client"
|
||||||
"github.com/nspcc-dev/neo-go/pkg/util"
|
"github.com/nspcc-dev/neo-go/pkg/util"
|
||||||
)
|
)
|
||||||
|
@ -11,6 +13,6 @@ const (
|
||||||
|
|
||||||
// SubjectProvider interface provides methods to get subject from FrostfsID contract.
|
// SubjectProvider interface provides methods to get subject from FrostfsID contract.
|
||||||
type SubjectProvider interface {
|
type SubjectProvider interface {
|
||||||
GetSubject(util.Uint160) (*client.Subject, error)
|
GetSubject(ctx context.Context, addr util.Uint160) (*client.Subject, error)
|
||||||
GetSubjectExtended(util.Uint160) (*client.SubjectExtended, error)
|
GetSubjectExtended(ctx context.Context, addr util.Uint160) (*client.SubjectExtended, error)
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,6 +1,8 @@
|
||||||
package netmap
|
package netmap
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -16,7 +18,7 @@ type Source interface {
|
||||||
//
|
//
|
||||||
// Implementations must not retain the network map pointer and modify
|
// Implementations must not retain the network map pointer and modify
|
||||||
// the network map through it.
|
// the network map through it.
|
||||||
GetNetMap(diff uint64) (*netmap.NetMap, error)
|
GetNetMap(ctx context.Context, diff uint64) (*netmap.NetMap, error)
|
||||||
|
|
||||||
// GetNetMapByEpoch reads network map by the epoch number from the storage.
|
// GetNetMapByEpoch reads network map by the epoch number from the storage.
|
||||||
// It returns the pointer to the requested network map and any error encountered.
|
// It returns the pointer to the requested network map and any error encountered.
|
||||||
|
@ -25,21 +27,21 @@ type Source interface {
|
||||||
//
|
//
|
||||||
// Implementations must not retain the network map pointer and modify
|
// Implementations must not retain the network map pointer and modify
|
||||||
// the network map through it.
|
// the network map through it.
|
||||||
GetNetMapByEpoch(epoch uint64) (*netmap.NetMap, error)
|
GetNetMapByEpoch(ctx context.Context, epoch uint64) (*netmap.NetMap, error)
|
||||||
|
|
||||||
// Epoch reads the current epoch from the storage.
|
// Epoch reads the current epoch from the storage.
|
||||||
// It returns thw number of the current epoch and any error encountered.
|
// It returns thw number of the current epoch and any error encountered.
|
||||||
//
|
//
|
||||||
// Must return exactly one non-default value.
|
// Must return exactly one non-default value.
|
||||||
Epoch() (uint64, error)
|
Epoch(ctx context.Context) (uint64, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetLatestNetworkMap requests and returns the latest network map from the storage.
|
// GetLatestNetworkMap requests and returns the latest network map from the storage.
|
||||||
func GetLatestNetworkMap(src Source) (*netmap.NetMap, error) {
|
func GetLatestNetworkMap(ctx context.Context, src Source) (*netmap.NetMap, error) {
|
||||||
return src.GetNetMap(0)
|
return src.GetNetMap(ctx, 0)
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetPreviousNetworkMap requests and returns previous from the latest network map from the storage.
|
// GetPreviousNetworkMap requests and returns previous from the latest network map from the storage.
|
||||||
func GetPreviousNetworkMap(src Source) (*netmap.NetMap, error) {
|
func GetPreviousNetworkMap(ctx context.Context, src Source) (*netmap.NetMap, error) {
|
||||||
return src.GetNetMap(1)
|
return src.GetNetMap(ctx, 1)
|
||||||
}
|
}
|
||||||
|
|
|
@ -199,7 +199,7 @@ func (v *FormatValidator) isIROrContainerNode(ctx context.Context, obj *objectSD
|
||||||
cnrIDBin := make([]byte, sha256.Size)
|
cnrIDBin := make([]byte, sha256.Size)
|
||||||
cnrID.Encode(cnrIDBin)
|
cnrID.Encode(cnrIDBin)
|
||||||
|
|
||||||
cnr, err := v.containers.Get(cnrID)
|
cnr, err := v.containers.Get(ctx, cnrID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return acl.RoleOthers, fmt.Errorf("failed to get container (id=%s): %w", cnrID.EncodeToString(), err)
|
return acl.RoleOthers, fmt.Errorf("failed to get container (id=%s): %w", cnrID.EncodeToString(), err)
|
||||||
}
|
}
|
||||||
|
|
|
@ -578,7 +578,7 @@ type testIRSource struct {
|
||||||
irNodes [][]byte
|
irNodes [][]byte
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *testIRSource) InnerRingKeys() ([][]byte, error) {
|
func (s *testIRSource) InnerRingKeys(_ context.Context) ([][]byte, error) {
|
||||||
return s.irNodes, nil
|
return s.irNodes, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -586,14 +586,14 @@ type testContainerSource struct {
|
||||||
containers map[cid.ID]*container.Container
|
containers map[cid.ID]*container.Container
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *testContainerSource) Get(cnrID cid.ID) (*container.Container, error) {
|
func (s *testContainerSource) Get(ctx context.Context, cnrID cid.ID) (*container.Container, error) {
|
||||||
if cnr, found := s.containers[cnrID]; found {
|
if cnr, found := s.containers[cnrID]; found {
|
||||||
return cnr, nil
|
return cnr, nil
|
||||||
}
|
}
|
||||||
return nil, fmt.Errorf("container not found")
|
return nil, fmt.Errorf("container not found")
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *testContainerSource) DeletionInfo(cid.ID) (*container.DelInfo, error) {
|
func (s *testContainerSource) DeletionInfo(context.Context, cid.ID) (*container.DelInfo, error) {
|
||||||
return nil, nil
|
return nil, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -602,20 +602,20 @@ type testNetmapSource struct {
|
||||||
currentEpoch uint64
|
currentEpoch uint64
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *testNetmapSource) GetNetMap(diff uint64) (*netmap.NetMap, error) {
|
func (s *testNetmapSource) GetNetMap(ctx context.Context, diff uint64) (*netmap.NetMap, error) {
|
||||||
if diff >= s.currentEpoch {
|
if diff >= s.currentEpoch {
|
||||||
return nil, fmt.Errorf("invalid diff")
|
return nil, fmt.Errorf("invalid diff")
|
||||||
}
|
}
|
||||||
return s.GetNetMapByEpoch(s.currentEpoch - diff)
|
return s.GetNetMapByEpoch(ctx, s.currentEpoch-diff)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *testNetmapSource) GetNetMapByEpoch(epoch uint64) (*netmap.NetMap, error) {
|
func (s *testNetmapSource) GetNetMapByEpoch(ctx context.Context, epoch uint64) (*netmap.NetMap, error) {
|
||||||
if nm, found := s.netmaps[epoch]; found {
|
if nm, found := s.netmaps[epoch]; found {
|
||||||
return nm, nil
|
return nm, nil
|
||||||
}
|
}
|
||||||
return nil, fmt.Errorf("netmap not found")
|
return nil, fmt.Errorf("netmap not found")
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *testNetmapSource) Epoch() (uint64, error) {
|
func (s *testNetmapSource) Epoch(ctx context.Context) (uint64, error) {
|
||||||
return s.currentEpoch, nil
|
return s.currentEpoch, nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -18,7 +18,7 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
type InnerRing interface {
|
type InnerRing interface {
|
||||||
InnerRingKeys() ([][]byte, error)
|
InnerRingKeys(ctx context.Context) ([][]byte, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
type SenderClassifier struct {
|
type SenderClassifier struct {
|
||||||
|
@ -63,7 +63,7 @@ func (c SenderClassifier) Classify(
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c SenderClassifier) IsInnerRingOrContainerNode(ctx context.Context, ownerKeyInBytes []byte, idCnr cid.ID, cnr container.Container) (*ClassifyResult, error) {
|
func (c SenderClassifier) IsInnerRingOrContainerNode(ctx context.Context, ownerKeyInBytes []byte, idCnr cid.ID, cnr container.Container) (*ClassifyResult, error) {
|
||||||
isInnerRingNode, err := c.isInnerRingKey(ownerKeyInBytes)
|
isInnerRingNode, err := c.isInnerRingKey(ctx, ownerKeyInBytes)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// do not throw error, try best case matching
|
// do not throw error, try best case matching
|
||||||
c.log.Debug(ctx, logs.V2CantCheckIfRequestFromInnerRing,
|
c.log.Debug(ctx, logs.V2CantCheckIfRequestFromInnerRing,
|
||||||
|
@ -78,7 +78,7 @@ func (c SenderClassifier) IsInnerRingOrContainerNode(ctx context.Context, ownerK
|
||||||
binCnr := make([]byte, sha256.Size)
|
binCnr := make([]byte, sha256.Size)
|
||||||
idCnr.Encode(binCnr)
|
idCnr.Encode(binCnr)
|
||||||
|
|
||||||
isContainerNode, err := c.isContainerKey(ownerKeyInBytes, binCnr, cnr)
|
isContainerNode, err := c.isContainerKey(ctx, ownerKeyInBytes, binCnr, cnr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// error might happen if request has `RoleOther` key and placement
|
// error might happen if request has `RoleOther` key and placement
|
||||||
// is not possible for previous epoch, so
|
// is not possible for previous epoch, so
|
||||||
|
@ -99,8 +99,8 @@ func (c SenderClassifier) IsInnerRingOrContainerNode(ctx context.Context, ownerK
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c SenderClassifier) isInnerRingKey(owner []byte) (bool, error) {
|
func (c SenderClassifier) isInnerRingKey(ctx context.Context, owner []byte) (bool, error) {
|
||||||
innerRingKeys, err := c.innerRing.InnerRingKeys()
|
innerRingKeys, err := c.innerRing.InnerRingKeys(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return false, err
|
return false, err
|
||||||
}
|
}
|
||||||
|
@ -116,10 +116,11 @@ func (c SenderClassifier) isInnerRingKey(owner []byte) (bool, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c SenderClassifier) isContainerKey(
|
func (c SenderClassifier) isContainerKey(
|
||||||
|
ctx context.Context,
|
||||||
owner, idCnr []byte,
|
owner, idCnr []byte,
|
||||||
cnr container.Container,
|
cnr container.Container,
|
||||||
) (bool, error) {
|
) (bool, error) {
|
||||||
nm, err := core.GetLatestNetworkMap(c.netmap) // first check current netmap
|
nm, err := core.GetLatestNetworkMap(ctx, c.netmap) // first check current netmap
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return false, err
|
return false, err
|
||||||
}
|
}
|
||||||
|
@ -133,7 +134,7 @@ func (c SenderClassifier) isContainerKey(
|
||||||
|
|
||||||
// then check previous netmap, this can happen in-between epoch change
|
// then check previous netmap, this can happen in-between epoch change
|
||||||
// when node migrates data from last epoch container
|
// when node migrates data from last epoch container
|
||||||
nm, err = core.GetPreviousNetworkMap(c.netmap)
|
nm, err = core.GetPreviousNetworkMap(ctx, c.netmap)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return false, err
|
return false, err
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,6 +1,8 @@
|
||||||
package innerring
|
package innerring
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client"
|
||||||
nmClient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/netmap"
|
nmClient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/netmap"
|
||||||
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
|
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
|
||||||
|
@ -47,12 +49,12 @@ type IrFetcherWithoutNotary struct {
|
||||||
|
|
||||||
// InnerRingKeys fetches list of innerring keys from NeoFSAlphabet
|
// InnerRingKeys fetches list of innerring keys from NeoFSAlphabet
|
||||||
// role in the sidechain.
|
// role in the sidechain.
|
||||||
func (fN IrFetcherWithNotary) InnerRingKeys() (keys.PublicKeys, error) {
|
func (fN IrFetcherWithNotary) InnerRingKeys(ctx context.Context) (keys.PublicKeys, error) {
|
||||||
return fN.cli.NeoFSAlphabetList()
|
return fN.cli.NeoFSAlphabetList(ctx)
|
||||||
}
|
}
|
||||||
|
|
||||||
// InnerRingKeys fetches list of innerring keys from netmap contract
|
// InnerRingKeys fetches list of innerring keys from netmap contract
|
||||||
// in the sidechain.
|
// in the sidechain.
|
||||||
func (f IrFetcherWithoutNotary) InnerRingKeys() (keys.PublicKeys, error) {
|
func (f IrFetcherWithoutNotary) InnerRingKeys(ctx context.Context) (keys.PublicKeys, error) {
|
||||||
return f.nm.GetInnerRingList()
|
return f.nm.GetInnerRingList(ctx)
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
package innerring
|
package innerring
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
@ -10,7 +11,7 @@ import (
|
||||||
|
|
||||||
type (
|
type (
|
||||||
irFetcher interface {
|
irFetcher interface {
|
||||||
InnerRingKeys() (keys.PublicKeys, error)
|
InnerRingKeys(ctx context.Context) (keys.PublicKeys, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
committeeFetcher interface {
|
committeeFetcher interface {
|
||||||
|
@ -45,7 +46,7 @@ func newInnerRingIndexer(comf committeeFetcher, irf irFetcher, key *keys.PublicK
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *innerRingIndexer) update() (ind indexes, err error) {
|
func (s *innerRingIndexer) update(ctx context.Context) (ind indexes, err error) {
|
||||||
s.RLock()
|
s.RLock()
|
||||||
|
|
||||||
if time.Since(s.lastAccess) < s.timeout {
|
if time.Since(s.lastAccess) < s.timeout {
|
||||||
|
@ -62,7 +63,7 @@ func (s *innerRingIndexer) update() (ind indexes, err error) {
|
||||||
return s.ind, nil
|
return s.ind, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
innerRing, err := s.irFetcher.InnerRingKeys()
|
innerRing, err := s.irFetcher.InnerRingKeys(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return indexes{}, err
|
return indexes{}, err
|
||||||
}
|
}
|
||||||
|
@ -81,8 +82,8 @@ func (s *innerRingIndexer) update() (ind indexes, err error) {
|
||||||
return s.ind, nil
|
return s.ind, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *innerRingIndexer) InnerRingIndex() (int32, error) {
|
func (s *innerRingIndexer) InnerRingIndex(ctx context.Context) (int32, error) {
|
||||||
ind, err := s.update()
|
ind, err := s.update(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, fmt.Errorf("can't update index state: %w", err)
|
return 0, fmt.Errorf("can't update index state: %w", err)
|
||||||
}
|
}
|
||||||
|
@ -90,8 +91,8 @@ func (s *innerRingIndexer) InnerRingIndex() (int32, error) {
|
||||||
return ind.innerRingIndex, nil
|
return ind.innerRingIndex, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *innerRingIndexer) InnerRingSize() (int32, error) {
|
func (s *innerRingIndexer) InnerRingSize(ctx context.Context) (int32, error) {
|
||||||
ind, err := s.update()
|
ind, err := s.update(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, fmt.Errorf("can't update index state: %w", err)
|
return 0, fmt.Errorf("can't update index state: %w", err)
|
||||||
}
|
}
|
||||||
|
@ -99,8 +100,8 @@ func (s *innerRingIndexer) InnerRingSize() (int32, error) {
|
||||||
return ind.innerRingSize, nil
|
return ind.innerRingSize, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *innerRingIndexer) AlphabetIndex() (int32, error) {
|
func (s *innerRingIndexer) AlphabetIndex(ctx context.Context) (int32, error) {
|
||||||
ind, err := s.update()
|
ind, err := s.update(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, fmt.Errorf("can't update index state: %w", err)
|
return 0, fmt.Errorf("can't update index state: %w", err)
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
package innerring
|
package innerring
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
"testing"
|
"testing"
|
||||||
|
@ -37,15 +38,15 @@ func TestIndexerReturnsIndexes(t *testing.T) {
|
||||||
|
|
||||||
indexer := newInnerRingIndexer(cf, irf, key, time.Second)
|
indexer := newInnerRingIndexer(cf, irf, key, time.Second)
|
||||||
|
|
||||||
idx, err := indexer.AlphabetIndex()
|
idx, err := indexer.AlphabetIndex(context.Background())
|
||||||
require.NoError(t, err, "failed to get alphabet index")
|
require.NoError(t, err, "failed to get alphabet index")
|
||||||
require.Equal(t, int32(1), idx, "invalid alphabet index")
|
require.Equal(t, int32(1), idx, "invalid alphabet index")
|
||||||
|
|
||||||
idx, err = indexer.InnerRingIndex()
|
idx, err = indexer.InnerRingIndex(context.Background())
|
||||||
require.NoError(t, err, "failed to get IR index")
|
require.NoError(t, err, "failed to get IR index")
|
||||||
require.Equal(t, int32(2), idx, "invalid IR index")
|
require.Equal(t, int32(2), idx, "invalid IR index")
|
||||||
|
|
||||||
size, err := indexer.InnerRingSize()
|
size, err := indexer.InnerRingSize(context.Background())
|
||||||
require.NoError(t, err, "failed to get IR size")
|
require.NoError(t, err, "failed to get IR size")
|
||||||
require.Equal(t, int32(3), size, "invalid IR size")
|
require.Equal(t, int32(3), size, "invalid IR size")
|
||||||
})
|
})
|
||||||
|
@ -56,11 +57,11 @@ func TestIndexerReturnsIndexes(t *testing.T) {
|
||||||
|
|
||||||
indexer := newInnerRingIndexer(cf, irf, key, time.Second)
|
indexer := newInnerRingIndexer(cf, irf, key, time.Second)
|
||||||
|
|
||||||
idx, err := indexer.AlphabetIndex()
|
idx, err := indexer.AlphabetIndex(context.Background())
|
||||||
require.NoError(t, err, "failed to get alphabet index")
|
require.NoError(t, err, "failed to get alphabet index")
|
||||||
require.Equal(t, int32(-1), idx, "invalid alphabet index")
|
require.Equal(t, int32(-1), idx, "invalid alphabet index")
|
||||||
|
|
||||||
idx, err = indexer.InnerRingIndex()
|
idx, err = indexer.InnerRingIndex(context.Background())
|
||||||
require.NoError(t, err, "failed to get IR index")
|
require.NoError(t, err, "failed to get IR index")
|
||||||
require.Equal(t, int32(0), idx, "invalid IR index")
|
require.Equal(t, int32(0), idx, "invalid IR index")
|
||||||
})
|
})
|
||||||
|
@ -71,11 +72,11 @@ func TestIndexerReturnsIndexes(t *testing.T) {
|
||||||
|
|
||||||
indexer := newInnerRingIndexer(cf, irf, key, time.Second)
|
indexer := newInnerRingIndexer(cf, irf, key, time.Second)
|
||||||
|
|
||||||
idx, err := indexer.AlphabetIndex()
|
idx, err := indexer.AlphabetIndex(context.Background())
|
||||||
require.NoError(t, err, "failed to get alphabet index")
|
require.NoError(t, err, "failed to get alphabet index")
|
||||||
require.Equal(t, int32(0), idx, "invalid alphabet index")
|
require.Equal(t, int32(0), idx, "invalid alphabet index")
|
||||||
|
|
||||||
idx, err = indexer.InnerRingIndex()
|
idx, err = indexer.InnerRingIndex(context.Background())
|
||||||
require.NoError(t, err, "failed to get IR index")
|
require.NoError(t, err, "failed to get IR index")
|
||||||
require.Equal(t, int32(-1), idx, "invalid IR index")
|
require.Equal(t, int32(-1), idx, "invalid IR index")
|
||||||
})
|
})
|
||||||
|
@ -100,30 +101,30 @@ func TestIndexerCachesIndexes(t *testing.T) {
|
||||||
|
|
||||||
indexer := newInnerRingIndexer(cf, irf, key, time.Second)
|
indexer := newInnerRingIndexer(cf, irf, key, time.Second)
|
||||||
|
|
||||||
idx, err := indexer.AlphabetIndex()
|
idx, err := indexer.AlphabetIndex(context.Background())
|
||||||
require.NoError(t, err, "failed to get alphabet index")
|
require.NoError(t, err, "failed to get alphabet index")
|
||||||
require.Equal(t, int32(-1), idx, "invalid alphabet index")
|
require.Equal(t, int32(-1), idx, "invalid alphabet index")
|
||||||
|
|
||||||
idx, err = indexer.InnerRingIndex()
|
idx, err = indexer.InnerRingIndex(context.Background())
|
||||||
require.NoError(t, err, "failed to get IR index")
|
require.NoError(t, err, "failed to get IR index")
|
||||||
require.Equal(t, int32(-1), idx, "invalid IR index")
|
require.Equal(t, int32(-1), idx, "invalid IR index")
|
||||||
|
|
||||||
size, err := indexer.InnerRingSize()
|
size, err := indexer.InnerRingSize(context.Background())
|
||||||
require.NoError(t, err, "failed to get IR size")
|
require.NoError(t, err, "failed to get IR size")
|
||||||
require.Equal(t, int32(0), size, "invalid IR size")
|
require.Equal(t, int32(0), size, "invalid IR size")
|
||||||
|
|
||||||
require.Equal(t, int32(1), cf.calls.Load(), "invalid commitee calls count")
|
require.Equal(t, int32(1), cf.calls.Load(), "invalid commitee calls count")
|
||||||
require.Equal(t, int32(1), irf.calls.Load(), "invalid IR calls count")
|
require.Equal(t, int32(1), irf.calls.Load(), "invalid IR calls count")
|
||||||
|
|
||||||
idx, err = indexer.AlphabetIndex()
|
idx, err = indexer.AlphabetIndex(context.Background())
|
||||||
require.NoError(t, err, "failed to get alphabet index")
|
require.NoError(t, err, "failed to get alphabet index")
|
||||||
require.Equal(t, int32(-1), idx, "invalid alphabet index")
|
require.Equal(t, int32(-1), idx, "invalid alphabet index")
|
||||||
|
|
||||||
idx, err = indexer.InnerRingIndex()
|
idx, err = indexer.InnerRingIndex(context.Background())
|
||||||
require.NoError(t, err, "failed to get IR index")
|
require.NoError(t, err, "failed to get IR index")
|
||||||
require.Equal(t, int32(-1), idx, "invalid IR index")
|
require.Equal(t, int32(-1), idx, "invalid IR index")
|
||||||
|
|
||||||
size, err = indexer.InnerRingSize()
|
size, err = indexer.InnerRingSize(context.Background())
|
||||||
require.NoError(t, err, "failed to get IR size")
|
require.NoError(t, err, "failed to get IR size")
|
||||||
require.Equal(t, int32(0), size, "invalid IR size")
|
require.Equal(t, int32(0), size, "invalid IR size")
|
||||||
|
|
||||||
|
@ -132,15 +133,15 @@ func TestIndexerCachesIndexes(t *testing.T) {
|
||||||
|
|
||||||
time.Sleep(2 * time.Second)
|
time.Sleep(2 * time.Second)
|
||||||
|
|
||||||
idx, err = indexer.AlphabetIndex()
|
idx, err = indexer.AlphabetIndex(context.Background())
|
||||||
require.NoError(t, err, "failed to get alphabet index")
|
require.NoError(t, err, "failed to get alphabet index")
|
||||||
require.Equal(t, int32(-1), idx, "invalid alphabet index")
|
require.Equal(t, int32(-1), idx, "invalid alphabet index")
|
||||||
|
|
||||||
idx, err = indexer.InnerRingIndex()
|
idx, err = indexer.InnerRingIndex(context.Background())
|
||||||
require.NoError(t, err, "failed to get IR index")
|
require.NoError(t, err, "failed to get IR index")
|
||||||
require.Equal(t, int32(-1), idx, "invalid IR index")
|
require.Equal(t, int32(-1), idx, "invalid IR index")
|
||||||
|
|
||||||
size, err = indexer.InnerRingSize()
|
size, err = indexer.InnerRingSize(context.Background())
|
||||||
require.NoError(t, err, "failed to get IR size")
|
require.NoError(t, err, "failed to get IR size")
|
||||||
require.Equal(t, int32(0), size, "invalid IR size")
|
require.Equal(t, int32(0), size, "invalid IR size")
|
||||||
|
|
||||||
|
@ -165,15 +166,15 @@ func TestIndexerThrowsErrors(t *testing.T) {
|
||||||
|
|
||||||
indexer := newInnerRingIndexer(cf, irf, key, time.Second)
|
indexer := newInnerRingIndexer(cf, irf, key, time.Second)
|
||||||
|
|
||||||
idx, err := indexer.AlphabetIndex()
|
idx, err := indexer.AlphabetIndex(context.Background())
|
||||||
require.ErrorContains(t, err, "test commitee error", "error from commitee not throwed")
|
require.ErrorContains(t, err, "test commitee error", "error from commitee not throwed")
|
||||||
require.Equal(t, int32(0), idx, "invalid alphabet index")
|
require.Equal(t, int32(0), idx, "invalid alphabet index")
|
||||||
|
|
||||||
idx, err = indexer.InnerRingIndex()
|
idx, err = indexer.InnerRingIndex(context.Background())
|
||||||
require.ErrorContains(t, err, "test commitee error", "error from IR not throwed")
|
require.ErrorContains(t, err, "test commitee error", "error from IR not throwed")
|
||||||
require.Equal(t, int32(0), idx, "invalid IR index")
|
require.Equal(t, int32(0), idx, "invalid IR index")
|
||||||
|
|
||||||
size, err := indexer.InnerRingSize()
|
size, err := indexer.InnerRingSize(context.Background())
|
||||||
require.ErrorContains(t, err, "test commitee error", "error from IR not throwed")
|
require.ErrorContains(t, err, "test commitee error", "error from IR not throwed")
|
||||||
require.Equal(t, int32(0), size, "invalid IR size")
|
require.Equal(t, int32(0), size, "invalid IR size")
|
||||||
|
|
||||||
|
@ -189,15 +190,15 @@ func TestIndexerThrowsErrors(t *testing.T) {
|
||||||
|
|
||||||
indexer = newInnerRingIndexer(cf, irf, key, time.Second)
|
indexer = newInnerRingIndexer(cf, irf, key, time.Second)
|
||||||
|
|
||||||
idx, err = indexer.AlphabetIndex()
|
idx, err = indexer.AlphabetIndex(context.Background())
|
||||||
require.ErrorContains(t, err, "test IR error", "error from commitee not throwed")
|
require.ErrorContains(t, err, "test IR error", "error from commitee not throwed")
|
||||||
require.Equal(t, int32(0), idx, "invalid alphabet index")
|
require.Equal(t, int32(0), idx, "invalid alphabet index")
|
||||||
|
|
||||||
idx, err = indexer.InnerRingIndex()
|
idx, err = indexer.InnerRingIndex(context.Background())
|
||||||
require.ErrorContains(t, err, "test IR error", "error from IR not throwed")
|
require.ErrorContains(t, err, "test IR error", "error from IR not throwed")
|
||||||
require.Equal(t, int32(0), idx, "invalid IR index")
|
require.Equal(t, int32(0), idx, "invalid IR index")
|
||||||
|
|
||||||
size, err = indexer.InnerRingSize()
|
size, err = indexer.InnerRingSize(context.Background())
|
||||||
require.ErrorContains(t, err, "test IR error", "error from IR not throwed")
|
require.ErrorContains(t, err, "test IR error", "error from IR not throwed")
|
||||||
require.Equal(t, int32(0), size, "invalid IR size")
|
require.Equal(t, int32(0), size, "invalid IR size")
|
||||||
}
|
}
|
||||||
|
@ -219,7 +220,7 @@ type testIRFetcher struct {
|
||||||
calls atomic.Int32
|
calls atomic.Int32
|
||||||
}
|
}
|
||||||
|
|
||||||
func (f *testIRFetcher) InnerRingKeys() (keys.PublicKeys, error) {
|
func (f *testIRFetcher) InnerRingKeys(context.Context) (keys.PublicKeys, error) {
|
||||||
f.calls.Add(1)
|
f.calls.Add(1)
|
||||||
return f.keys, f.err
|
return f.keys, f.err
|
||||||
}
|
}
|
||||||
|
|
|
@ -38,10 +38,7 @@ import (
|
||||||
func (s *Server) initNetmapProcessor(ctx context.Context, cfg *viper.Viper,
|
func (s *Server) initNetmapProcessor(ctx context.Context, cfg *viper.Viper,
|
||||||
alphaSync event.Handler,
|
alphaSync event.Handler,
|
||||||
) error {
|
) error {
|
||||||
locodeValidator, err := s.newLocodeValidator(cfg)
|
locodeValidator := s.newLocodeValidator(cfg)
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
netSettings := (*networkSettings)(s.netmapClient)
|
netSettings := (*networkSettings)(s.netmapClient)
|
||||||
|
|
||||||
|
@ -51,6 +48,7 @@ func (s *Server) initNetmapProcessor(ctx context.Context, cfg *viper.Viper,
|
||||||
poolSize := cfg.GetInt("workers.netmap")
|
poolSize := cfg.GetInt("workers.netmap")
|
||||||
s.log.Debug(ctx, logs.NetmapNetmapWorkerPool, zap.Int("size", poolSize))
|
s.log.Debug(ctx, logs.NetmapNetmapWorkerPool, zap.Int("size", poolSize))
|
||||||
|
|
||||||
|
var err error
|
||||||
s.netmapProcessor, err = netmap.New(&netmap.Params{
|
s.netmapProcessor, err = netmap.New(&netmap.Params{
|
||||||
Log: s.log,
|
Log: s.log,
|
||||||
Metrics: s.irMetrics,
|
Metrics: s.irMetrics,
|
||||||
|
|
|
@ -575,19 +575,19 @@ func parseMultinetConfig(cfg *viper.Viper, m metrics.MultinetMetrics) internalNe
|
||||||
|
|
||||||
func (s *Server) initConfigFromBlockchain(ctx context.Context) error {
|
func (s *Server) initConfigFromBlockchain(ctx context.Context) error {
|
||||||
// get current epoch
|
// get current epoch
|
||||||
epoch, err := s.netmapClient.Epoch()
|
epoch, err := s.netmapClient.Epoch(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("can't read epoch number: %w", err)
|
return fmt.Errorf("can't read epoch number: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// get current epoch duration
|
// get current epoch duration
|
||||||
epochDuration, err := s.netmapClient.EpochDuration()
|
epochDuration, err := s.netmapClient.EpochDuration(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("can't read epoch duration: %w", err)
|
return fmt.Errorf("can't read epoch duration: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// get balance precision
|
// get balance precision
|
||||||
balancePrecision, err := s.balanceClient.Decimals()
|
balancePrecision, err := s.balanceClient.Decimals(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("can't read balance contract precision: %w", err)
|
return fmt.Errorf("can't read balance contract precision: %w", err)
|
||||||
}
|
}
|
||||||
|
@ -597,7 +597,7 @@ func (s *Server) initConfigFromBlockchain(ctx context.Context) error {
|
||||||
s.precision.SetBalancePrecision(balancePrecision)
|
s.precision.SetBalancePrecision(balancePrecision)
|
||||||
|
|
||||||
// get next epoch delta tick
|
// get next epoch delta tick
|
||||||
s.initialEpochTickDelta, err = s.nextEpochBlockDelta()
|
s.initialEpochTickDelta, err = s.nextEpochBlockDelta(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -613,8 +613,8 @@ func (s *Server) initConfigFromBlockchain(ctx context.Context) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Server) nextEpochBlockDelta() (uint32, error) {
|
func (s *Server) nextEpochBlockDelta(ctx context.Context) (uint32, error) {
|
||||||
epochBlock, err := s.netmapClient.LastEpochBlock()
|
epochBlock, err := s.netmapClient.LastEpochBlock(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, fmt.Errorf("can't read last epoch block: %w", err)
|
return 0, fmt.Errorf("can't read last epoch block: %w", err)
|
||||||
}
|
}
|
||||||
|
|
|
@ -9,7 +9,7 @@ import (
|
||||||
"github.com/spf13/viper"
|
"github.com/spf13/viper"
|
||||||
)
|
)
|
||||||
|
|
||||||
func (s *Server) newLocodeValidator(cfg *viper.Viper) (netmap.NodeValidator, error) {
|
func (s *Server) newLocodeValidator(cfg *viper.Viper) netmap.NodeValidator {
|
||||||
locodeDB := locodebolt.New(locodebolt.Prm{
|
locodeDB := locodebolt.New(locodebolt.Prm{
|
||||||
Path: cfg.GetString("locode.db.path"),
|
Path: cfg.GetString("locode.db.path"),
|
||||||
},
|
},
|
||||||
|
@ -21,7 +21,7 @@ func (s *Server) newLocodeValidator(cfg *viper.Viper) (netmap.NodeValidator, err
|
||||||
|
|
||||||
return irlocode.New(irlocode.Prm{
|
return irlocode.New(irlocode.Prm{
|
||||||
DB: (*locodeBoltDBWrapper)(locodeDB),
|
DB: (*locodeBoltDBWrapper)(locodeDB),
|
||||||
}), nil
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
type locodeBoltEntryWrapper struct {
|
type locodeBoltEntryWrapper struct {
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
package innerring
|
package innerring
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors/netmap/nodevalidation/state"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors/netmap/nodevalidation/state"
|
||||||
|
@ -17,8 +18,8 @@ type networkSettings netmapclient.Client
|
||||||
// MaintenanceModeAllowed requests network configuration from the Sidechain
|
// MaintenanceModeAllowed requests network configuration from the Sidechain
|
||||||
// and check allowance of storage node's maintenance mode according to it.
|
// and check allowance of storage node's maintenance mode according to it.
|
||||||
// Always returns state.ErrMaintenanceModeDisallowed.
|
// Always returns state.ErrMaintenanceModeDisallowed.
|
||||||
func (s *networkSettings) MaintenanceModeAllowed() error {
|
func (s *networkSettings) MaintenanceModeAllowed(ctx context.Context) error {
|
||||||
allowed, err := (*netmapclient.Client)(s).MaintenanceModeAllowed()
|
allowed, err := (*netmapclient.Client)(s).MaintenanceModeAllowed(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("read maintenance mode's allowance from the Sidechain: %w", err)
|
return fmt.Errorf("read maintenance mode's allowance from the Sidechain: %w", err)
|
||||||
} else if allowed {
|
} else if allowed {
|
||||||
|
|
|
@ -279,6 +279,6 @@ type testNetmapClient struct {
|
||||||
netmap *netmap.NetMap
|
netmap *netmap.NetMap
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *testNetmapClient) NetMap() (*netmap.NetMap, error) {
|
func (c *testNetmapClient) NetMap(context.Context) (*netmap.NetMap, error) {
|
||||||
return c.netmap, nil
|
return c.netmap, nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -44,7 +44,7 @@ func (ap *Processor) processEmit(ctx context.Context) bool {
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
networkMap, err := ap.netmapClient.NetMap()
|
networkMap, err := ap.netmapClient.NetMap(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
ap.log.Warn(ctx, logs.AlphabetCantGetNetmapSnapshotToEmitGasToStorageNodes,
|
ap.log.Warn(ctx, logs.AlphabetCantGetNetmapSnapshotToEmitGasToStorageNodes,
|
||||||
zap.Error(err))
|
zap.Error(err))
|
||||||
|
|
|
@ -36,7 +36,7 @@ type (
|
||||||
}
|
}
|
||||||
|
|
||||||
netmapClient interface {
|
netmapClient interface {
|
||||||
NetMap() (*netmap.NetMap, error)
|
NetMap(ctx context.Context) (*netmap.NetMap, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
morphClient interface {
|
morphClient interface {
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
package container
|
package container
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"crypto/ecdsa"
|
"crypto/ecdsa"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
@ -45,7 +46,7 @@ type signatureVerificationData struct {
|
||||||
// - v.binPublicKey is a public session key
|
// - v.binPublicKey is a public session key
|
||||||
// - session context corresponds to the container and verb in v
|
// - session context corresponds to the container and verb in v
|
||||||
// - session is "alive"
|
// - session is "alive"
|
||||||
func (cp *Processor) verifySignature(v signatureVerificationData) error {
|
func (cp *Processor) verifySignature(ctx context.Context, v signatureVerificationData) error {
|
||||||
var err error
|
var err error
|
||||||
var key frostfsecdsa.PublicKeyRFC6979
|
var key frostfsecdsa.PublicKeyRFC6979
|
||||||
keyProvided := v.binPublicKey != nil
|
keyProvided := v.binPublicKey != nil
|
||||||
|
@ -58,7 +59,7 @@ func (cp *Processor) verifySignature(v signatureVerificationData) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(v.binTokenSession) > 0 {
|
if len(v.binTokenSession) > 0 {
|
||||||
return cp.verifyByTokenSession(v, &key, keyProvided)
|
return cp.verifyByTokenSession(ctx, v, &key, keyProvided)
|
||||||
}
|
}
|
||||||
|
|
||||||
if keyProvided {
|
if keyProvided {
|
||||||
|
@ -77,8 +78,8 @@ func (cp *Processor) verifySignature(v signatureVerificationData) error {
|
||||||
return errors.New("signature is invalid or calculated with the key not bound to the container owner")
|
return errors.New("signature is invalid or calculated with the key not bound to the container owner")
|
||||||
}
|
}
|
||||||
|
|
||||||
func (cp *Processor) checkTokenLifetime(token session.Container) error {
|
func (cp *Processor) checkTokenLifetime(ctx context.Context, token session.Container) error {
|
||||||
curEpoch, err := cp.netState.Epoch()
|
curEpoch, err := cp.netState.Epoch(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("could not read current epoch: %w", err)
|
return fmt.Errorf("could not read current epoch: %w", err)
|
||||||
}
|
}
|
||||||
|
@ -90,7 +91,7 @@ func (cp *Processor) checkTokenLifetime(token session.Container) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (cp *Processor) verifyByTokenSession(v signatureVerificationData, key *frostfsecdsa.PublicKeyRFC6979, keyProvided bool) error {
|
func (cp *Processor) verifyByTokenSession(ctx context.Context, v signatureVerificationData, key *frostfsecdsa.PublicKeyRFC6979, keyProvided bool) error {
|
||||||
var tok session.Container
|
var tok session.Container
|
||||||
|
|
||||||
err := tok.Unmarshal(v.binTokenSession)
|
err := tok.Unmarshal(v.binTokenSession)
|
||||||
|
@ -118,7 +119,7 @@ func (cp *Processor) verifyByTokenSession(v signatureVerificationData, key *fros
|
||||||
return errors.New("owner differs with token owner")
|
return errors.New("owner differs with token owner")
|
||||||
}
|
}
|
||||||
|
|
||||||
err = cp.checkTokenLifetime(tok)
|
err = cp.checkTokenLifetime(ctx, tok)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("check session lifetime: %w", err)
|
return fmt.Errorf("check session lifetime: %w", err)
|
||||||
}
|
}
|
||||||
|
|
|
@ -170,11 +170,11 @@ type testNetworkState struct {
|
||||||
epoch uint64
|
epoch uint64
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *testNetworkState) HomomorphicHashDisabled() (bool, error) {
|
func (s *testNetworkState) HomomorphicHashDisabled(context.Context) (bool, error) {
|
||||||
return s.homHashDisabled, nil
|
return s.homHashDisabled, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *testNetworkState) Epoch() (uint64, error) {
|
func (s *testNetworkState) Epoch(context.Context) (uint64, error) {
|
||||||
return s.epoch, nil
|
return s.epoch, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -187,7 +187,7 @@ func (c *testContainerClient) ContractAddress() util.Uint160 {
|
||||||
return c.contractAddress
|
return c.contractAddress
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *testContainerClient) Get(cid []byte) (*containercore.Container, error) {
|
func (c *testContainerClient) Get(ctx context.Context, cid []byte) (*containercore.Container, error) {
|
||||||
key := hex.EncodeToString(cid)
|
key := hex.EncodeToString(cid)
|
||||||
if cont, found := c.get[key]; found {
|
if cont, found := c.get[key]; found {
|
||||||
return cont, nil
|
return cont, nil
|
||||||
|
@ -237,6 +237,6 @@ func (c *testMorphClient) NotarySignAndInvokeTX(mainTx *transaction.Transaction)
|
||||||
|
|
||||||
type testFrostFSIDClient struct{}
|
type testFrostFSIDClient struct{}
|
||||||
|
|
||||||
func (c *testFrostFSIDClient) GetSubject(addr util.Uint160) (*frostfsidclient.Subject, error) {
|
func (c *testFrostFSIDClient) GetSubject(ctx context.Context, addr util.Uint160) (*frostfsidclient.Subject, error) {
|
||||||
return &frostfsidclient.Subject{}, nil
|
return &frostfsidclient.Subject{}, nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -47,7 +47,7 @@ func (cp *Processor) processContainerPut(ctx context.Context, put putEvent) bool
|
||||||
e: put,
|
e: put,
|
||||||
}
|
}
|
||||||
|
|
||||||
err := cp.checkPutContainer(pctx)
|
err := cp.checkPutContainer(ctx, pctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
cp.log.Error(ctx, logs.ContainerPutContainerCheckFailed,
|
cp.log.Error(ctx, logs.ContainerPutContainerCheckFailed,
|
||||||
zap.Error(err),
|
zap.Error(err),
|
||||||
|
@ -66,8 +66,8 @@ func (cp *Processor) processContainerPut(ctx context.Context, put putEvent) bool
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
func (cp *Processor) checkPutContainer(ctx *putContainerContext) error {
|
func (cp *Processor) checkPutContainer(ctx context.Context, pctx *putContainerContext) error {
|
||||||
binCnr := ctx.e.Container()
|
binCnr := pctx.e.Container()
|
||||||
var cnr containerSDK.Container
|
var cnr containerSDK.Container
|
||||||
|
|
||||||
err := cnr.Unmarshal(binCnr)
|
err := cnr.Unmarshal(binCnr)
|
||||||
|
@ -75,12 +75,12 @@ func (cp *Processor) checkPutContainer(ctx *putContainerContext) error {
|
||||||
return fmt.Errorf("invalid binary container: %w", err)
|
return fmt.Errorf("invalid binary container: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
err = cp.verifySignature(signatureVerificationData{
|
err = cp.verifySignature(ctx, signatureVerificationData{
|
||||||
ownerContainer: cnr.Owner(),
|
ownerContainer: cnr.Owner(),
|
||||||
verb: session.VerbContainerPut,
|
verb: session.VerbContainerPut,
|
||||||
binTokenSession: ctx.e.SessionToken(),
|
binTokenSession: pctx.e.SessionToken(),
|
||||||
binPublicKey: ctx.e.PublicKey(),
|
binPublicKey: pctx.e.PublicKey(),
|
||||||
signature: ctx.e.Signature(),
|
signature: pctx.e.Signature(),
|
||||||
signedData: binCnr,
|
signedData: binCnr,
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -88,13 +88,13 @@ func (cp *Processor) checkPutContainer(ctx *putContainerContext) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
// check homomorphic hashing setting
|
// check homomorphic hashing setting
|
||||||
err = checkHomomorphicHashing(cp.netState, cnr)
|
err = checkHomomorphicHashing(ctx, cp.netState, cnr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("incorrect homomorphic hashing setting: %w", err)
|
return fmt.Errorf("incorrect homomorphic hashing setting: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// check native name and zone
|
// check native name and zone
|
||||||
err = cp.checkNNS(ctx, cnr)
|
err = cp.checkNNS(ctx, pctx, cnr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("NNS: %w", err)
|
return fmt.Errorf("NNS: %w", err)
|
||||||
}
|
}
|
||||||
|
@ -110,7 +110,7 @@ func (cp *Processor) processContainerDelete(ctx context.Context, e containerEven
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
err := cp.checkDeleteContainer(e)
|
err := cp.checkDeleteContainer(ctx, e)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
cp.log.Error(ctx, logs.ContainerDeleteContainerCheckFailed,
|
cp.log.Error(ctx, logs.ContainerDeleteContainerCheckFailed,
|
||||||
zap.Error(err),
|
zap.Error(err),
|
||||||
|
@ -130,7 +130,7 @@ func (cp *Processor) processContainerDelete(ctx context.Context, e containerEven
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
func (cp *Processor) checkDeleteContainer(e containerEvent.Delete) error {
|
func (cp *Processor) checkDeleteContainer(ctx context.Context, e containerEvent.Delete) error {
|
||||||
binCnr := e.ContainerID()
|
binCnr := e.ContainerID()
|
||||||
|
|
||||||
var idCnr cid.ID
|
var idCnr cid.ID
|
||||||
|
@ -141,12 +141,12 @@ func (cp *Processor) checkDeleteContainer(e containerEvent.Delete) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
// receive owner of the related container
|
// receive owner of the related container
|
||||||
cnr, err := cp.cnrClient.Get(binCnr)
|
cnr, err := cp.cnrClient.Get(ctx, binCnr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("could not receive the container: %w", err)
|
return fmt.Errorf("could not receive the container: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
err = cp.verifySignature(signatureVerificationData{
|
err = cp.verifySignature(ctx, signatureVerificationData{
|
||||||
ownerContainer: cnr.Value.Owner(),
|
ownerContainer: cnr.Value.Owner(),
|
||||||
verb: session.VerbContainerDelete,
|
verb: session.VerbContainerDelete,
|
||||||
idContainerSet: true,
|
idContainerSet: true,
|
||||||
|
@ -163,21 +163,21 @@ func (cp *Processor) checkDeleteContainer(e containerEvent.Delete) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (cp *Processor) checkNNS(ctx *putContainerContext, cnr containerSDK.Container) error {
|
func (cp *Processor) checkNNS(ctx context.Context, pctx *putContainerContext, cnr containerSDK.Container) error {
|
||||||
// fetch domain info
|
// fetch domain info
|
||||||
ctx.d = containerSDK.ReadDomain(cnr)
|
pctx.d = containerSDK.ReadDomain(cnr)
|
||||||
|
|
||||||
// if PutNamed event => check if values in container correspond to args
|
// if PutNamed event => check if values in container correspond to args
|
||||||
if named, ok := ctx.e.(interface {
|
if named, ok := pctx.e.(interface {
|
||||||
Name() string
|
Name() string
|
||||||
Zone() string
|
Zone() string
|
||||||
}); ok {
|
}); ok {
|
||||||
if name := named.Name(); name != ctx.d.Name() {
|
if name := named.Name(); name != pctx.d.Name() {
|
||||||
return fmt.Errorf("names differ %s/%s", name, ctx.d.Name())
|
return fmt.Errorf("names differ %s/%s", name, pctx.d.Name())
|
||||||
}
|
}
|
||||||
|
|
||||||
if zone := named.Zone(); zone != ctx.d.Zone() {
|
if zone := named.Zone(); zone != pctx.d.Zone() {
|
||||||
return fmt.Errorf("zones differ %s/%s", zone, ctx.d.Zone())
|
return fmt.Errorf("zones differ %s/%s", zone, pctx.d.Zone())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -186,12 +186,12 @@ func (cp *Processor) checkNNS(ctx *putContainerContext, cnr containerSDK.Contain
|
||||||
return fmt.Errorf("could not get container owner address: %w", err)
|
return fmt.Errorf("could not get container owner address: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
subject, err := cp.frostFSIDClient.GetSubject(addr)
|
subject, err := cp.frostFSIDClient.GetSubject(ctx, addr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("could not get subject from FrostfsID contract: %w", err)
|
return fmt.Errorf("could not get subject from FrostfsID contract: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
namespace, hasNamespace := strings.CutSuffix(ctx.d.Zone(), ".ns")
|
namespace, hasNamespace := strings.CutSuffix(pctx.d.Zone(), ".ns")
|
||||||
if !hasNamespace {
|
if !hasNamespace {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -203,8 +203,8 @@ func (cp *Processor) checkNNS(ctx *putContainerContext, cnr containerSDK.Contain
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func checkHomomorphicHashing(ns NetworkState, cnr containerSDK.Container) error {
|
func checkHomomorphicHashing(ctx context.Context, ns NetworkState, cnr containerSDK.Container) error {
|
||||||
netSetting, err := ns.HomomorphicHashDisabled()
|
netSetting, err := ns.HomomorphicHashDisabled(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("could not get setting in contract: %w", err)
|
return fmt.Errorf("could not get setting in contract: %w", err)
|
||||||
}
|
}
|
||||||
|
|
|
@ -25,7 +25,7 @@ type (
|
||||||
|
|
||||||
ContClient interface {
|
ContClient interface {
|
||||||
ContractAddress() util.Uint160
|
ContractAddress() util.Uint160
|
||||||
Get(cid []byte) (*containercore.Container, error)
|
Get(ctx context.Context, cid []byte) (*containercore.Container, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
MorphClient interface {
|
MorphClient interface {
|
||||||
|
@ -33,7 +33,7 @@ type (
|
||||||
}
|
}
|
||||||
|
|
||||||
FrostFSIDClient interface {
|
FrostFSIDClient interface {
|
||||||
GetSubject(addr util.Uint160) (*frostfsidclient.Subject, error)
|
GetSubject(ctx context.Context, addr util.Uint160) (*frostfsidclient.Subject, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Processor of events produced by container contract in the sidechain.
|
// Processor of events produced by container contract in the sidechain.
|
||||||
|
@ -68,7 +68,7 @@ type NetworkState interface {
|
||||||
//
|
//
|
||||||
// Must return any error encountered
|
// Must return any error encountered
|
||||||
// which did not allow reading the value.
|
// which did not allow reading the value.
|
||||||
Epoch() (uint64, error)
|
Epoch(ctx context.Context) (uint64, error)
|
||||||
|
|
||||||
// HomomorphicHashDisabled must return boolean that
|
// HomomorphicHashDisabled must return boolean that
|
||||||
// represents homomorphic network state:
|
// represents homomorphic network state:
|
||||||
|
@ -76,7 +76,7 @@ type NetworkState interface {
|
||||||
// * false if hashing is enabled.
|
// * false if hashing is enabled.
|
||||||
//
|
//
|
||||||
// which did not allow reading the value.
|
// which did not allow reading the value.
|
||||||
HomomorphicHashDisabled() (bool, error)
|
HomomorphicHashDisabled(ctx context.Context) (bool, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
// New creates a container contract processor instance.
|
// New creates a container contract processor instance.
|
||||||
|
|
|
@ -236,7 +236,7 @@ type testIRFetcher struct {
|
||||||
publicKeys keys.PublicKeys
|
publicKeys keys.PublicKeys
|
||||||
}
|
}
|
||||||
|
|
||||||
func (f *testIRFetcher) InnerRingKeys() (keys.PublicKeys, error) {
|
func (f *testIRFetcher) InnerRingKeys(context.Context) (keys.PublicKeys, error) {
|
||||||
return f.publicKeys, nil
|
return f.publicKeys, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -266,7 +266,7 @@ type testMainnetClient struct {
|
||||||
designateHash util.Uint160
|
designateHash util.Uint160
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *testMainnetClient) NeoFSAlphabetList() (res keys.PublicKeys, err error) {
|
func (c *testMainnetClient) NeoFSAlphabetList(context.Context) (res keys.PublicKeys, err error) {
|
||||||
return c.alphabetKeys, nil
|
return c.alphabetKeys, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -25,7 +25,7 @@ func (gp *Processor) processAlphabetSync(ctx context.Context, txHash util.Uint25
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
mainnetAlphabet, err := gp.mainnetClient.NeoFSAlphabetList()
|
mainnetAlphabet, err := gp.mainnetClient.NeoFSAlphabetList(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
gp.log.Error(ctx, logs.GovernanceCantFetchAlphabetListFromMainNet,
|
gp.log.Error(ctx, logs.GovernanceCantFetchAlphabetListFromMainNet,
|
||||||
zap.Error(err))
|
zap.Error(err))
|
||||||
|
@ -95,7 +95,7 @@ func prettyKeys(keys keys.PublicKeys) string {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (gp *Processor) updateNeoFSAlphabetRoleInSidechain(ctx context.Context, sidechainAlphabet, newAlphabet keys.PublicKeys, txHash util.Uint256) {
|
func (gp *Processor) updateNeoFSAlphabetRoleInSidechain(ctx context.Context, sidechainAlphabet, newAlphabet keys.PublicKeys, txHash util.Uint256) {
|
||||||
innerRing, err := gp.irFetcher.InnerRingKeys()
|
innerRing, err := gp.irFetcher.InnerRingKeys(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
gp.log.Error(ctx, logs.GovernanceCantFetchInnerRingListFromSideChain,
|
gp.log.Error(ctx, logs.GovernanceCantFetchInnerRingListFromSideChain,
|
||||||
zap.Error(err))
|
zap.Error(err))
|
||||||
|
|
|
@ -52,7 +52,7 @@ type (
|
||||||
// Implementation must take into account availability of
|
// Implementation must take into account availability of
|
||||||
// the notary contract.
|
// the notary contract.
|
||||||
IRFetcher interface {
|
IRFetcher interface {
|
||||||
InnerRingKeys() (keys.PublicKeys, error)
|
InnerRingKeys(ctx context.Context) (keys.PublicKeys, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
FrostFSClient interface {
|
FrostFSClient interface {
|
||||||
|
@ -64,7 +64,7 @@ type (
|
||||||
}
|
}
|
||||||
|
|
||||||
MainnetClient interface {
|
MainnetClient interface {
|
||||||
NeoFSAlphabetList() (res keys.PublicKeys, err error)
|
NeoFSAlphabetList(context.Context) (res keys.PublicKeys, err error)
|
||||||
GetDesignateHash() util.Uint160
|
GetDesignateHash() util.Uint160
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -294,7 +294,7 @@ type testNodeStateSettings struct {
|
||||||
maintAllowed bool
|
maintAllowed bool
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *testNodeStateSettings) MaintenanceModeAllowed() error {
|
func (s *testNodeStateSettings) MaintenanceModeAllowed(context.Context) error {
|
||||||
if s.maintAllowed {
|
if s.maintAllowed {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -303,7 +303,7 @@ func (s *testNodeStateSettings) MaintenanceModeAllowed() error {
|
||||||
|
|
||||||
type testValidator struct{}
|
type testValidator struct{}
|
||||||
|
|
||||||
func (v *testValidator) VerifyAndUpdate(*netmap.NodeInfo) error {
|
func (v *testValidator) VerifyAndUpdate(context.Context, *netmap.NodeInfo) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -381,7 +381,7 @@ func (c *testNetmapClient) ContractAddress() util.Uint160 {
|
||||||
return c.contractAddress
|
return c.contractAddress
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *testNetmapClient) EpochDuration() (uint64, error) {
|
func (c *testNetmapClient) EpochDuration(context.Context) (uint64, error) {
|
||||||
return c.epochDuration, nil
|
return c.epochDuration, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -392,7 +392,7 @@ func (c *testNetmapClient) MorphTxHeight(h util.Uint256) (uint32, error) {
|
||||||
return 0, fmt.Errorf("not found")
|
return 0, fmt.Errorf("not found")
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *testNetmapClient) NetMap() (*netmap.NetMap, error) {
|
func (c *testNetmapClient) NetMap(context.Context) (*netmap.NetMap, error) {
|
||||||
return c.netmap, nil
|
return c.netmap, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
package locode
|
package locode
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
|
||||||
|
@ -29,7 +30,7 @@ var errMissingRequiredAttr = errors.New("missing required attribute in DB record
|
||||||
// - Continent: R.Continent().String().
|
// - Continent: R.Continent().String().
|
||||||
//
|
//
|
||||||
// UN-LOCODE attribute remains untouched.
|
// UN-LOCODE attribute remains untouched.
|
||||||
func (v *Validator) VerifyAndUpdate(n *netmap.NodeInfo) error {
|
func (v *Validator) VerifyAndUpdate(_ context.Context, n *netmap.NodeInfo) error {
|
||||||
attrLocode := n.LOCODE()
|
attrLocode := n.LOCODE()
|
||||||
if attrLocode == "" {
|
if attrLocode == "" {
|
||||||
return nil
|
return nil
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
package locode_test
|
package locode_test
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"testing"
|
"testing"
|
||||||
|
@ -92,7 +93,7 @@ func TestValidator_VerifyAndUpdate(t *testing.T) {
|
||||||
t.Run("w/o locode", func(t *testing.T) {
|
t.Run("w/o locode", func(t *testing.T) {
|
||||||
n := nodeInfoWithSomeAttrs()
|
n := nodeInfoWithSomeAttrs()
|
||||||
|
|
||||||
err := validator.VerifyAndUpdate(n)
|
err := validator.VerifyAndUpdate(context.Background(), n)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
})
|
})
|
||||||
|
|
||||||
|
@ -102,7 +103,7 @@ func TestValidator_VerifyAndUpdate(t *testing.T) {
|
||||||
|
|
||||||
addLocodeAttrValue(n, "WRONG LOCODE")
|
addLocodeAttrValue(n, "WRONG LOCODE")
|
||||||
|
|
||||||
err := validator.VerifyAndUpdate(n)
|
err := validator.VerifyAndUpdate(context.Background(), n)
|
||||||
require.Error(t, err)
|
require.Error(t, err)
|
||||||
})
|
})
|
||||||
|
|
||||||
|
@ -111,7 +112,7 @@ func TestValidator_VerifyAndUpdate(t *testing.T) {
|
||||||
|
|
||||||
addLocodeAttr(n, locodestd.LOCODE{"RU", "SPB"})
|
addLocodeAttr(n, locodestd.LOCODE{"RU", "SPB"})
|
||||||
|
|
||||||
err := validator.VerifyAndUpdate(n)
|
err := validator.VerifyAndUpdate(context.Background(), n)
|
||||||
require.Error(t, err)
|
require.Error(t, err)
|
||||||
})
|
})
|
||||||
|
|
||||||
|
@ -119,7 +120,7 @@ func TestValidator_VerifyAndUpdate(t *testing.T) {
|
||||||
|
|
||||||
addLocodeAttr(n, r.LOCODE)
|
addLocodeAttr(n, r.LOCODE)
|
||||||
|
|
||||||
err := validator.VerifyAndUpdate(n)
|
err := validator.VerifyAndUpdate(context.Background(), n)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
require.Equal(t, rec.CountryCode().String(), n.Attribute("CountryCode"))
|
require.Equal(t, rec.CountryCode().String(), n.Attribute("CountryCode"))
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
package maddress
|
package maddress
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network"
|
||||||
|
@ -8,7 +9,7 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
// VerifyAndUpdate calls network.VerifyAddress.
|
// VerifyAndUpdate calls network.VerifyAddress.
|
||||||
func (v *Validator) VerifyAndUpdate(n *netmap.NodeInfo) error {
|
func (v *Validator) VerifyAndUpdate(_ context.Context, n *netmap.NodeInfo) error {
|
||||||
err := network.VerifyMultiAddress(*n)
|
err := network.VerifyMultiAddress(*n)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("could not verify multiaddress: %w", err)
|
return fmt.Errorf("could not verify multiaddress: %w", err)
|
||||||
|
|
|
@ -7,6 +7,7 @@ map candidates.
|
||||||
package state
|
package state
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"errors"
|
"errors"
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
|
||||||
|
@ -23,7 +24,7 @@ type NetworkSettings interface {
|
||||||
// no error if allowed;
|
// no error if allowed;
|
||||||
// ErrMaintenanceModeDisallowed if disallowed;
|
// ErrMaintenanceModeDisallowed if disallowed;
|
||||||
// other error if there are any problems with the check.
|
// other error if there are any problems with the check.
|
||||||
MaintenanceModeAllowed() error
|
MaintenanceModeAllowed(ctx context.Context) error
|
||||||
}
|
}
|
||||||
|
|
||||||
// NetMapCandidateValidator represents tool which checks state of nodes which
|
// NetMapCandidateValidator represents tool which checks state of nodes which
|
||||||
|
@ -55,13 +56,13 @@ func (x *NetMapCandidateValidator) SetNetworkSettings(netSettings NetworkSetting
|
||||||
// MUST NOT be called before SetNetworkSettings.
|
// MUST NOT be called before SetNetworkSettings.
|
||||||
//
|
//
|
||||||
// See also netmap.NodeInfo.IsOnline/SetOnline and other similar methods.
|
// See also netmap.NodeInfo.IsOnline/SetOnline and other similar methods.
|
||||||
func (x *NetMapCandidateValidator) VerifyAndUpdate(node *netmap.NodeInfo) error {
|
func (x *NetMapCandidateValidator) VerifyAndUpdate(ctx context.Context, node *netmap.NodeInfo) error {
|
||||||
if node.Status().IsOnline() {
|
if node.Status().IsOnline() {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
if node.Status().IsMaintenance() {
|
if node.Status().IsMaintenance() {
|
||||||
return x.netSettings.MaintenanceModeAllowed()
|
return x.netSettings.MaintenanceModeAllowed(ctx)
|
||||||
}
|
}
|
||||||
|
|
||||||
return errors.New("invalid status: MUST be either ONLINE or MAINTENANCE")
|
return errors.New("invalid status: MUST be either ONLINE or MAINTENANCE")
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
package state_test
|
package state_test
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors/netmap/nodevalidation/state"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors/netmap/nodevalidation/state"
|
||||||
|
@ -13,7 +14,7 @@ type testNetworkSettings struct {
|
||||||
disallowed bool
|
disallowed bool
|
||||||
}
|
}
|
||||||
|
|
||||||
func (x testNetworkSettings) MaintenanceModeAllowed() error {
|
func (x testNetworkSettings) MaintenanceModeAllowed(context.Context) error {
|
||||||
if x.disallowed {
|
if x.disallowed {
|
||||||
return state.ErrMaintenanceModeDisallowed
|
return state.ErrMaintenanceModeDisallowed
|
||||||
}
|
}
|
||||||
|
@ -81,7 +82,7 @@ func TestValidator_VerifyAndUpdate(t *testing.T) {
|
||||||
testCase.validatorPreparer(&v)
|
testCase.validatorPreparer(&v)
|
||||||
}
|
}
|
||||||
|
|
||||||
err := v.VerifyAndUpdate(&node)
|
err := v.VerifyAndUpdate(context.Background(), &node)
|
||||||
|
|
||||||
if testCase.valid {
|
if testCase.valid {
|
||||||
require.NoError(t, err, testCase.name)
|
require.NoError(t, err, testCase.name)
|
||||||
|
|
|
@ -1,6 +1,8 @@
|
||||||
package nodevalidation
|
package nodevalidation
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors/netmap"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors/netmap"
|
||||||
apinetmap "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
|
apinetmap "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
|
||||||
)
|
)
|
||||||
|
@ -26,9 +28,9 @@ func New(validators ...netmap.NodeValidator) *CompositeValidator {
|
||||||
// VerifyAndUpdate passes apinetmap.NodeInfo to wrapped validators.
|
// VerifyAndUpdate passes apinetmap.NodeInfo to wrapped validators.
|
||||||
//
|
//
|
||||||
// If error appears, returns it immediately.
|
// If error appears, returns it immediately.
|
||||||
func (c *CompositeValidator) VerifyAndUpdate(ni *apinetmap.NodeInfo) error {
|
func (c *CompositeValidator) VerifyAndUpdate(ctx context.Context, ni *apinetmap.NodeInfo) error {
|
||||||
for _, v := range c.validators {
|
for _, v := range c.validators {
|
||||||
if err := v.VerifyAndUpdate(ni); err != nil {
|
if err := v.VerifyAndUpdate(ctx, ni); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -14,7 +14,7 @@ import (
|
||||||
func (np *Processor) processNewEpoch(ctx context.Context, ev netmapEvent.NewEpoch) bool {
|
func (np *Processor) processNewEpoch(ctx context.Context, ev netmapEvent.NewEpoch) bool {
|
||||||
epoch := ev.EpochNumber()
|
epoch := ev.EpochNumber()
|
||||||
|
|
||||||
epochDuration, err := np.netmapClient.EpochDuration()
|
epochDuration, err := np.netmapClient.EpochDuration(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
np.log.Warn(ctx, logs.NetmapCantGetEpochDuration,
|
np.log.Warn(ctx, logs.NetmapCantGetEpochDuration,
|
||||||
zap.Error(err))
|
zap.Error(err))
|
||||||
|
@ -37,7 +37,7 @@ func (np *Processor) processNewEpoch(ctx context.Context, ev netmapEvent.NewEpoc
|
||||||
}
|
}
|
||||||
|
|
||||||
// get new netmap snapshot
|
// get new netmap snapshot
|
||||||
networkMap, err := np.netmapClient.NetMap()
|
networkMap, err := np.netmapClient.NetMap(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
np.log.Warn(ctx, logs.NetmapCantGetNetmapSnapshotToPerformCleanup,
|
np.log.Warn(ctx, logs.NetmapCantGetNetmapSnapshotToPerformCleanup,
|
||||||
zap.Error(err))
|
zap.Error(err))
|
||||||
|
|
|
@ -39,7 +39,7 @@ func (np *Processor) processAddPeer(ctx context.Context, ev netmapEvent.AddPeer)
|
||||||
}
|
}
|
||||||
|
|
||||||
// validate and update node info
|
// validate and update node info
|
||||||
err = np.nodeValidator.VerifyAndUpdate(&nodeInfo)
|
err = np.nodeValidator.VerifyAndUpdate(ctx, &nodeInfo)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
np.log.Warn(ctx, logs.NetmapCouldNotVerifyAndUpdateInformationAboutNetworkMapCandidate,
|
np.log.Warn(ctx, logs.NetmapCouldNotVerifyAndUpdateInformationAboutNetworkMapCandidate,
|
||||||
zap.Error(err),
|
zap.Error(err),
|
||||||
|
@ -108,7 +108,7 @@ func (np *Processor) processUpdatePeer(ctx context.Context, ev netmapEvent.Updat
|
||||||
var err error
|
var err error
|
||||||
|
|
||||||
if ev.Maintenance() {
|
if ev.Maintenance() {
|
||||||
err = np.nodeStateSettings.MaintenanceModeAllowed()
|
err = np.nodeStateSettings.MaintenanceModeAllowed(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
np.log.Info(ctx, logs.NetmapPreventSwitchingNodeToMaintenanceState,
|
np.log.Info(ctx, logs.NetmapPreventSwitchingNodeToMaintenanceState,
|
||||||
zap.Error(err),
|
zap.Error(err),
|
||||||
|
|
|
@ -49,15 +49,15 @@ type (
|
||||||
//
|
//
|
||||||
// If no error occurs, the parameter must point to the
|
// If no error occurs, the parameter must point to the
|
||||||
// ready-made NodeInfo structure.
|
// ready-made NodeInfo structure.
|
||||||
VerifyAndUpdate(*netmap.NodeInfo) error
|
VerifyAndUpdate(context.Context, *netmap.NodeInfo) error
|
||||||
}
|
}
|
||||||
|
|
||||||
Client interface {
|
Client interface {
|
||||||
MorphNotaryInvoke(ctx context.Context, contract util.Uint160, fee fixedn.Fixed8, nonce uint32, vub *uint32, method string, args ...any) error
|
MorphNotaryInvoke(ctx context.Context, contract util.Uint160, fee fixedn.Fixed8, nonce uint32, vub *uint32, method string, args ...any) error
|
||||||
ContractAddress() util.Uint160
|
ContractAddress() util.Uint160
|
||||||
EpochDuration() (uint64, error)
|
EpochDuration(ctx context.Context) (uint64, error)
|
||||||
MorphTxHeight(h util.Uint256) (res uint32, err error)
|
MorphTxHeight(h util.Uint256) (res uint32, err error)
|
||||||
NetMap() (*netmap.NetMap, error)
|
NetMap(ctx context.Context) (*netmap.NetMap, error)
|
||||||
NewEpoch(ctx context.Context, epoch uint64) error
|
NewEpoch(ctx context.Context, epoch uint64) error
|
||||||
MorphIsValidScript(script []byte, signers []transaction.Signer) (valid bool, err error)
|
MorphIsValidScript(script []byte, signers []transaction.Signer) (valid bool, err error)
|
||||||
MorphNotarySignAndInvokeTX(mainTx *transaction.Transaction) error
|
MorphNotarySignAndInvokeTX(mainTx *transaction.Transaction) error
|
||||||
|
|
|
@ -34,16 +34,16 @@ func (w *netmapClientWrapper) ContractAddress() util.Uint160 {
|
||||||
return w.netmapClient.ContractAddress()
|
return w.netmapClient.ContractAddress()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (w *netmapClientWrapper) EpochDuration() (uint64, error) {
|
func (w *netmapClientWrapper) EpochDuration(ctx context.Context) (uint64, error) {
|
||||||
return w.netmapClient.EpochDuration()
|
return w.netmapClient.EpochDuration(ctx)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (w *netmapClientWrapper) MorphTxHeight(h util.Uint256) (res uint32, err error) {
|
func (w *netmapClientWrapper) MorphTxHeight(h util.Uint256) (res uint32, err error) {
|
||||||
return w.netmapClient.Morph().TxHeight(h)
|
return w.netmapClient.Morph().TxHeight(h)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (w *netmapClientWrapper) NetMap() (*netmap.NetMap, error) {
|
func (w *netmapClientWrapper) NetMap(ctx context.Context) (*netmap.NetMap, error) {
|
||||||
return w.netmapClient.NetMap()
|
return w.netmapClient.NetMap(ctx)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (w *netmapClientWrapper) NewEpoch(ctx context.Context, epoch uint64) error {
|
func (w *netmapClientWrapper) NewEpoch(ctx context.Context, epoch uint64) error {
|
||||||
|
|
|
@ -60,7 +60,7 @@ func (s *Server) IsAlphabet(ctx context.Context) bool {
|
||||||
// InnerRingIndex is a getter for a global index of node in inner ring list. Negative
|
// InnerRingIndex is a getter for a global index of node in inner ring list. Negative
|
||||||
// index means that node is not in the inner ring list.
|
// index means that node is not in the inner ring list.
|
||||||
func (s *Server) InnerRingIndex(ctx context.Context) int {
|
func (s *Server) InnerRingIndex(ctx context.Context) int {
|
||||||
index, err := s.statusIndex.InnerRingIndex()
|
index, err := s.statusIndex.InnerRingIndex(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
s.log.Error(ctx, logs.InnerringCantGetInnerRingIndex, zap.Error(err))
|
s.log.Error(ctx, logs.InnerringCantGetInnerRingIndex, zap.Error(err))
|
||||||
return -1
|
return -1
|
||||||
|
@ -72,7 +72,7 @@ func (s *Server) InnerRingIndex(ctx context.Context) int {
|
||||||
// InnerRingSize is a getter for a global size of inner ring list. This value
|
// InnerRingSize is a getter for a global size of inner ring list. This value
|
||||||
// paired with inner ring index.
|
// paired with inner ring index.
|
||||||
func (s *Server) InnerRingSize(ctx context.Context) int {
|
func (s *Server) InnerRingSize(ctx context.Context) int {
|
||||||
size, err := s.statusIndex.InnerRingSize()
|
size, err := s.statusIndex.InnerRingSize(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
s.log.Error(ctx, logs.InnerringCantGetInnerRingSize, zap.Error(err))
|
s.log.Error(ctx, logs.InnerringCantGetInnerRingSize, zap.Error(err))
|
||||||
return 0
|
return 0
|
||||||
|
@ -84,7 +84,7 @@ func (s *Server) InnerRingSize(ctx context.Context) int {
|
||||||
// AlphabetIndex is a getter for a global index of node in alphabet list.
|
// AlphabetIndex is a getter for a global index of node in alphabet list.
|
||||||
// Negative index means that node is not in the alphabet list.
|
// Negative index means that node is not in the alphabet list.
|
||||||
func (s *Server) AlphabetIndex(ctx context.Context) int {
|
func (s *Server) AlphabetIndex(ctx context.Context) int {
|
||||||
index, err := s.statusIndex.AlphabetIndex()
|
index, err := s.statusIndex.AlphabetIndex(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
s.log.Error(ctx, logs.InnerringCantGetAlphabetIndex, zap.Error(err))
|
s.log.Error(ctx, logs.InnerringCantGetAlphabetIndex, zap.Error(err))
|
||||||
return -1
|
return -1
|
||||||
|
|
|
@ -6,7 +6,6 @@ import (
|
||||||
"syscall"
|
"syscall"
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||||
tracingPkg "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/tracing"
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
||||||
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
||||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||||
|
@ -94,7 +93,6 @@ func (b *Blobovnicza) Delete(ctx context.Context, prm DeletePrm) (DeleteRes, err
|
||||||
b.log.Debug(ctx, logs.BlobovniczaObjectWasRemovedFromBucket,
|
b.log.Debug(ctx, logs.BlobovniczaObjectWasRemovedFromBucket,
|
||||||
zap.String("binary size", stringifyByteSize(dataSize)),
|
zap.String("binary size", stringifyByteSize(dataSize)),
|
||||||
zap.String("range", stringifyBounds(sizeLowerBound, sizeUpperBound)),
|
zap.String("range", stringifyBounds(sizeLowerBound, sizeUpperBound)),
|
||||||
zap.String("trace_id", tracingPkg.GetTraceID(ctx)),
|
|
||||||
)
|
)
|
||||||
b.itemDeleted(recordSize)
|
b.itemDeleted(recordSize)
|
||||||
}
|
}
|
||||||
|
|
|
@ -41,8 +41,11 @@ func (b *Blobovniczas) initializeDBs(ctx context.Context) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
eg, egCtx := errgroup.WithContext(ctx)
|
eg, egCtx := errgroup.WithContext(ctx)
|
||||||
eg.SetLimit(b.blzInitWorkerCount)
|
if b.blzInitWorkerCount > 0 {
|
||||||
err = b.iterateIncompletedRebuildDBPaths(egCtx, func(p string) (bool, error) {
|
eg.SetLimit(b.blzInitWorkerCount + 1)
|
||||||
|
}
|
||||||
|
eg.Go(func() error {
|
||||||
|
return b.iterateIncompletedRebuildDBPaths(egCtx, func(p string) (bool, error) {
|
||||||
eg.Go(func() error {
|
eg.Go(func() error {
|
||||||
p = strings.TrimSuffix(p, rebuildSuffix)
|
p = strings.TrimSuffix(p, rebuildSuffix)
|
||||||
shBlz := b.getBlobovniczaWithoutCaching(p)
|
shBlz := b.getBlobovniczaWithoutCaching(p)
|
||||||
|
@ -65,11 +68,7 @@ func (b *Blobovniczas) initializeDBs(ctx context.Context) error {
|
||||||
})
|
})
|
||||||
return false, nil
|
return false, nil
|
||||||
})
|
})
|
||||||
if err != nil {
|
})
|
||||||
_ = eg.Wait()
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
return eg.Wait()
|
return eg.Wait()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -2,6 +2,9 @@ package blobovniczatree
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"os"
|
||||||
|
"path"
|
||||||
|
"strconv"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
objectCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
objectCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
||||||
|
@ -129,3 +132,34 @@ func TestObjectsAvailableAfterDepthAndWidthEdit(t *testing.T) {
|
||||||
|
|
||||||
require.NoError(t, blz.Close(context.Background()))
|
require.NoError(t, blz.Close(context.Background()))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestInitBlobovniczasInitErrorType(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
|
||||||
|
rootDir := t.TempDir()
|
||||||
|
|
||||||
|
for idx := 0; idx < 10; idx++ {
|
||||||
|
f, err := os.Create(path.Join(rootDir, strconv.FormatInt(int64(idx), 10)+".db"))
|
||||||
|
require.NoError(t, err)
|
||||||
|
_, err = f.Write([]byte("invalid db"))
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.NoError(t, f.Close())
|
||||||
|
|
||||||
|
f, err = os.Create(path.Join(rootDir, strconv.FormatInt(int64(idx), 10)+".db"+rebuildSuffix))
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.NoError(t, f.Close())
|
||||||
|
}
|
||||||
|
|
||||||
|
blz := NewBlobovniczaTree(
|
||||||
|
context.Background(),
|
||||||
|
WithBlobovniczaShallowDepth(1),
|
||||||
|
WithBlobovniczaShallowWidth(1),
|
||||||
|
WithRootPath(rootDir),
|
||||||
|
)
|
||||||
|
|
||||||
|
require.NoError(t, blz.Open(mode.ComponentReadWrite))
|
||||||
|
err := blz.Init()
|
||||||
|
require.Contains(t, err.Error(), "open blobovnicza")
|
||||||
|
require.Contains(t, err.Error(), "invalid database")
|
||||||
|
require.NoError(t, blz.Close(context.Background()))
|
||||||
|
}
|
||||||
|
|
|
@ -10,7 +10,6 @@ import (
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobovnicza"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobovnicza"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
|
||||||
tracingPkg "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/tracing"
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
|
||||||
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
||||||
|
@ -19,7 +18,10 @@ import (
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
|
||||||
var errObjectIsDeleteProtected = errors.New("object is delete protected")
|
var (
|
||||||
|
errObjectIsDeleteProtected = errors.New("object is delete protected")
|
||||||
|
deleteRes = common.DeleteRes{}
|
||||||
|
)
|
||||||
|
|
||||||
// Delete deletes object from blobovnicza tree.
|
// Delete deletes object from blobovnicza tree.
|
||||||
//
|
//
|
||||||
|
@ -43,17 +45,17 @@ func (b *Blobovniczas) Delete(ctx context.Context, prm common.DeletePrm) (res co
|
||||||
defer span.End()
|
defer span.End()
|
||||||
|
|
||||||
if b.readOnly {
|
if b.readOnly {
|
||||||
return common.DeleteRes{}, common.ErrReadOnly
|
return deleteRes, common.ErrReadOnly
|
||||||
}
|
}
|
||||||
|
|
||||||
if b.rebuildGuard.TryRLock() {
|
if b.rebuildGuard.TryRLock() {
|
||||||
defer b.rebuildGuard.RUnlock()
|
defer b.rebuildGuard.RUnlock()
|
||||||
} else {
|
} else {
|
||||||
return common.DeleteRes{}, errRebuildInProgress
|
return deleteRes, errRebuildInProgress
|
||||||
}
|
}
|
||||||
|
|
||||||
if b.deleteProtectedObjects.Contains(prm.Address) {
|
if b.deleteProtectedObjects.Contains(prm.Address) {
|
||||||
return common.DeleteRes{}, errObjectIsDeleteProtected
|
return deleteRes, errObjectIsDeleteProtected
|
||||||
}
|
}
|
||||||
|
|
||||||
var bPrm blobovnicza.DeletePrm
|
var bPrm blobovnicza.DeletePrm
|
||||||
|
@ -83,7 +85,6 @@ func (b *Blobovniczas) Delete(ctx context.Context, prm common.DeletePrm) (res co
|
||||||
b.log.Debug(ctx, logs.BlobovniczatreeCouldNotRemoveObjectFromLevel,
|
b.log.Debug(ctx, logs.BlobovniczatreeCouldNotRemoveObjectFromLevel,
|
||||||
zap.String("level", p),
|
zap.String("level", p),
|
||||||
zap.Error(err),
|
zap.Error(err),
|
||||||
zap.String("trace_id", tracingPkg.GetTraceID(ctx)),
|
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -98,7 +99,7 @@ func (b *Blobovniczas) Delete(ctx context.Context, prm common.DeletePrm) (res co
|
||||||
|
|
||||||
if err == nil && !objectFound {
|
if err == nil && !objectFound {
|
||||||
// not found in any blobovnicza
|
// not found in any blobovnicza
|
||||||
return common.DeleteRes{}, logicerr.Wrap(new(apistatus.ObjectNotFound))
|
return deleteRes, logicerr.Wrap(new(apistatus.ObjectNotFound))
|
||||||
}
|
}
|
||||||
success = err == nil
|
success = err == nil
|
||||||
|
|
||||||
|
@ -112,7 +113,7 @@ func (b *Blobovniczas) deleteObjectFromLevel(ctx context.Context, prm blobovnicz
|
||||||
shBlz := b.getBlobovnicza(ctx, blzPath)
|
shBlz := b.getBlobovnicza(ctx, blzPath)
|
||||||
blz, err := shBlz.Open(ctx)
|
blz, err := shBlz.Open(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return common.DeleteRes{}, err
|
return deleteRes, err
|
||||||
}
|
}
|
||||||
defer shBlz.Close(ctx)
|
defer shBlz.Close(ctx)
|
||||||
|
|
||||||
|
@ -122,5 +123,5 @@ func (b *Blobovniczas) deleteObjectFromLevel(ctx context.Context, prm blobovnicz
|
||||||
// removes object from blobovnicza and returns common.DeleteRes.
|
// removes object from blobovnicza and returns common.DeleteRes.
|
||||||
func (b *Blobovniczas) deleteObject(ctx context.Context, blz *blobovnicza.Blobovnicza, prm blobovnicza.DeletePrm) (common.DeleteRes, error) {
|
func (b *Blobovniczas) deleteObject(ctx context.Context, blz *blobovnicza.Blobovnicza, prm blobovnicza.DeletePrm) (common.DeleteRes, error) {
|
||||||
_, err := blz.Delete(ctx, prm)
|
_, err := blz.Delete(ctx, prm)
|
||||||
return common.DeleteRes{}, err
|
return deleteRes, err
|
||||||
}
|
}
|
||||||
|
|
|
@ -8,7 +8,6 @@ import (
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobovnicza"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobovnicza"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
||||||
tracingPkg "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/tracing"
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
|
||||||
"go.opentelemetry.io/otel/attribute"
|
"go.opentelemetry.io/otel/attribute"
|
||||||
|
@ -57,8 +56,7 @@ func (b *Blobovniczas) Exists(ctx context.Context, prm common.ExistsPrm) (common
|
||||||
if !client.IsErrObjectNotFound(err) {
|
if !client.IsErrObjectNotFound(err) {
|
||||||
b.log.Debug(ctx, logs.BlobovniczatreeCouldNotGetObjectFromLevel,
|
b.log.Debug(ctx, logs.BlobovniczatreeCouldNotGetObjectFromLevel,
|
||||||
zap.String("level", p),
|
zap.String("level", p),
|
||||||
zap.Error(err),
|
zap.Error(err))
|
||||||
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -10,7 +10,6 @@ import (
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobovnicza"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobovnicza"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
|
||||||
tracingPkg "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/tracing"
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
|
||||||
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
||||||
|
@ -70,7 +69,6 @@ func (b *Blobovniczas) Get(ctx context.Context, prm common.GetPrm) (res common.G
|
||||||
b.log.Debug(ctx, logs.BlobovniczatreeCouldNotGetObjectFromLevel,
|
b.log.Debug(ctx, logs.BlobovniczatreeCouldNotGetObjectFromLevel,
|
||||||
zap.String("level", p),
|
zap.String("level", p),
|
||||||
zap.Error(err),
|
zap.Error(err),
|
||||||
zap.String("trace_id", tracingPkg.GetTraceID(ctx)),
|
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -11,7 +11,6 @@ import (
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobovnicza"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobovnicza"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
|
||||||
tracingPkg "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/tracing"
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
|
||||||
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
||||||
|
@ -71,8 +70,7 @@ func (b *Blobovniczas) GetRange(ctx context.Context, prm common.GetRangePrm) (re
|
||||||
if !outOfBounds && !client.IsErrObjectNotFound(err) {
|
if !outOfBounds && !client.IsErrObjectNotFound(err) {
|
||||||
b.log.Debug(ctx, logs.BlobovniczatreeCouldNotGetObjectFromLevel,
|
b.log.Debug(ctx, logs.BlobovniczatreeCouldNotGetObjectFromLevel,
|
||||||
zap.String("level", p),
|
zap.String("level", p),
|
||||||
zap.Error(err),
|
zap.Error(err))
|
||||||
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
|
|
||||||
}
|
}
|
||||||
if outOfBounds {
|
if outOfBounds {
|
||||||
return true, err
|
return true, err
|
||||||
|
|
|
@ -249,6 +249,12 @@ func (b *Blobovniczas) iterateSortedDBPaths(ctx context.Context, addr oid.Addres
|
||||||
}
|
}
|
||||||
|
|
||||||
func (b *Blobovniczas) iterateSordedDBPathsInternal(ctx context.Context, path string, addr oid.Address, f func(string) (bool, error)) (bool, error) {
|
func (b *Blobovniczas) iterateSordedDBPathsInternal(ctx context.Context, path string, addr oid.Address, f func(string) (bool, error)) (bool, error) {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return false, ctx.Err()
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
|
||||||
sysPath := filepath.Join(b.rootPath, path)
|
sysPath := filepath.Join(b.rootPath, path)
|
||||||
entries, err := os.ReadDir(sysPath)
|
entries, err := os.ReadDir(sysPath)
|
||||||
if os.IsNotExist(err) && b.readOnly && path == "" { // non initialized tree in read only mode
|
if os.IsNotExist(err) && b.readOnly && path == "" { // non initialized tree in read only mode
|
||||||
|
|
|
@ -9,7 +9,6 @@ import (
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobovnicza"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobovnicza"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
||||||
tracingPkg "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/tracing"
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
||||||
"go.opentelemetry.io/otel/attribute"
|
"go.opentelemetry.io/otel/attribute"
|
||||||
"go.opentelemetry.io/otel/trace"
|
"go.opentelemetry.io/otel/trace"
|
||||||
|
@ -83,16 +82,14 @@ func (i *putIterator) iterate(ctx context.Context, lvlPath string) (bool, error)
|
||||||
i.B.reportError(ctx, logs.BlobovniczatreeCouldNotGetActiveBlobovnicza, err)
|
i.B.reportError(ctx, logs.BlobovniczatreeCouldNotGetActiveBlobovnicza, err)
|
||||||
} else {
|
} else {
|
||||||
i.B.log.Debug(ctx, logs.BlobovniczatreeCouldNotGetActiveBlobovnicza,
|
i.B.log.Debug(ctx, logs.BlobovniczatreeCouldNotGetActiveBlobovnicza,
|
||||||
zap.Error(err),
|
zap.Error(err))
|
||||||
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return false, nil
|
return false, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
if active == nil {
|
if active == nil {
|
||||||
i.B.log.Debug(ctx, logs.BlobovniczatreeBlobovniczaOverflowed, zap.String("level", lvlPath),
|
i.B.log.Debug(ctx, logs.BlobovniczatreeBlobovniczaOverflowed, zap.String("level", lvlPath))
|
||||||
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
|
|
||||||
return false, nil
|
return false, nil
|
||||||
}
|
}
|
||||||
defer active.Close(ctx)
|
defer active.Close(ctx)
|
||||||
|
@ -106,8 +103,7 @@ func (i *putIterator) iterate(ctx context.Context, lvlPath string) (bool, error)
|
||||||
} else {
|
} else {
|
||||||
i.B.log.Debug(ctx, logs.BlobovniczatreeCouldNotPutObjectToActiveBlobovnicza,
|
i.B.log.Debug(ctx, logs.BlobovniczatreeCouldNotPutObjectToActiveBlobovnicza,
|
||||||
zap.String("path", active.SystemPath()),
|
zap.String("path", active.SystemPath()),
|
||||||
zap.Error(err),
|
zap.Error(err))
|
||||||
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
|
|
||||||
}
|
}
|
||||||
if errors.Is(err, blobovnicza.ErrNoSpace) {
|
if errors.Is(err, blobovnicza.ErrNoSpace) {
|
||||||
i.AllFull = true
|
i.AllFull = true
|
||||||
|
|
|
@ -7,7 +7,6 @@ import (
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
||||||
tracingPkg "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/tracing"
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
||||||
"go.opentelemetry.io/otel/attribute"
|
"go.opentelemetry.io/otel/attribute"
|
||||||
"go.opentelemetry.io/otel/trace"
|
"go.opentelemetry.io/otel/trace"
|
||||||
|
@ -75,8 +74,7 @@ func (b *BlobStor) Exists(ctx context.Context, prm common.ExistsPrm) (common.Exi
|
||||||
for _, err := range errors[:len(errors)-1] {
|
for _, err := range errors[:len(errors)-1] {
|
||||||
b.log.Warn(ctx, logs.BlobstorErrorOccurredDuringObjectExistenceChecking,
|
b.log.Warn(ctx, logs.BlobstorErrorOccurredDuringObjectExistenceChecking,
|
||||||
zap.Stringer("address", prm.Address),
|
zap.Stringer("address", prm.Address),
|
||||||
zap.Error(err),
|
zap.Error(err))
|
||||||
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return common.ExistsRes{}, errors[len(errors)-1]
|
return common.ExistsRes{}, errors[len(errors)-1]
|
||||||
|
|
|
@ -136,6 +136,6 @@ func (w *genericWriter) removeWithCounter(p string, size uint64) error {
|
||||||
if err := os.Remove(p); err != nil {
|
if err := os.Remove(p); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
w.fileCounter.Dec(uint64(size))
|
w.fileCounter.Dec(size)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -69,10 +69,13 @@ func (w *linuxWriter) writeFile(p string, data []byte) error {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
written := 0
|
||||||
tmpPath := "/proc/self/fd/" + strconv.FormatUint(uint64(fd), 10)
|
tmpPath := "/proc/self/fd/" + strconv.FormatUint(uint64(fd), 10)
|
||||||
n, err := unix.Write(fd, data)
|
n, err := unix.Write(fd, data)
|
||||||
if err == nil {
|
for err == nil {
|
||||||
if n == len(data) {
|
written += n
|
||||||
|
|
||||||
|
if written == len(data) {
|
||||||
err = unix.Linkat(unix.AT_FDCWD, tmpPath, unix.AT_FDCWD, p, unix.AT_SYMLINK_FOLLOW)
|
err = unix.Linkat(unix.AT_FDCWD, tmpPath, unix.AT_FDCWD, p, unix.AT_SYMLINK_FOLLOW)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
w.fileCounter.Inc(uint64(len(data)))
|
w.fileCounter.Inc(uint64(len(data)))
|
||||||
|
@ -80,9 +83,23 @@ func (w *linuxWriter) writeFile(p string, data []byte) error {
|
||||||
if errors.Is(err, unix.EEXIST) {
|
if errors.Is(err, unix.EEXIST) {
|
||||||
err = nil
|
err = nil
|
||||||
}
|
}
|
||||||
} else {
|
break
|
||||||
err = errors.New("incomplete write")
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// From man 2 write:
|
||||||
|
// https://www.man7.org/linux/man-pages/man2/write.2.html
|
||||||
|
//
|
||||||
|
// Note that a successful write() may transfer fewer than count
|
||||||
|
// bytes. Such partial writes can occur for various reasons; for
|
||||||
|
// example, because there was insufficient space on the disk device
|
||||||
|
// to write all of the requested bytes, or because a blocked write()
|
||||||
|
// to a socket, pipe, or similar was interrupted by a signal handler
|
||||||
|
// after it had transferred some, but before it had transferred all
|
||||||
|
// of the requested bytes. In the event of a partial write, the
|
||||||
|
// caller can make another write() call to transfer the remaining
|
||||||
|
// bytes. The subsequent call will either transfer further bytes or
|
||||||
|
// may result in an error (e.g., if the disk is now full).
|
||||||
|
n, err = unix.Write(fd, data[written:])
|
||||||
}
|
}
|
||||||
errClose := unix.Close(fd)
|
errClose := unix.Close(fd)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -114,7 +131,7 @@ func (w *linuxWriter) removeFile(p string, size uint64) error {
|
||||||
return logicerr.Wrap(new(apistatus.ObjectNotFound))
|
return logicerr.Wrap(new(apistatus.ObjectNotFound))
|
||||||
}
|
}
|
||||||
if err == nil {
|
if err == nil {
|
||||||
w.fileCounter.Dec(uint64(size))
|
w.fileCounter.Dec(size)
|
||||||
}
|
}
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,42 @@
|
||||||
|
//go:build linux && integration
|
||||||
|
|
||||||
|
package fstree
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"errors"
|
||||||
|
"os"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
"golang.org/x/sys/unix"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestENOSPC(t *testing.T) {
|
||||||
|
dir, err := os.MkdirTemp(t.TempDir(), "ramdisk")
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
f, err := os.CreateTemp(t.TempDir(), "ramdisk_*")
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
err = unix.Mount(f.Name(), dir, "tmpfs", 0, "size=1M")
|
||||||
|
if errors.Is(err, unix.EPERM) {
|
||||||
|
t.Skipf("skip size tests: no permission to mount: %v", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
require.NoError(t, err)
|
||||||
|
defer func() {
|
||||||
|
require.NoError(t, unix.Unmount(dir, 0))
|
||||||
|
}()
|
||||||
|
|
||||||
|
fst := New(WithPath(dir), WithDepth(1))
|
||||||
|
require.NoError(t, fst.Open(mode.ComponentReadWrite))
|
||||||
|
require.NoError(t, fst.Init())
|
||||||
|
|
||||||
|
_, err = fst.Put(context.Background(), common.PutPrm{
|
||||||
|
RawData: make([]byte, 10<<20),
|
||||||
|
})
|
||||||
|
require.ErrorIs(t, err, common.ErrNoSpace)
|
||||||
|
}
|
|
@ -133,11 +133,11 @@ func (s *memstoreImpl) Iterate(_ context.Context, req common.IteratePrm) (common
|
||||||
elem := common.IterationElement{
|
elem := common.IterationElement{
|
||||||
ObjectData: v,
|
ObjectData: v,
|
||||||
}
|
}
|
||||||
if err := elem.Address.DecodeString(string(k)); err != nil {
|
if err := elem.Address.DecodeString(k); err != nil {
|
||||||
if req.IgnoreErrors {
|
if req.IgnoreErrors {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
return common.IterateRes{}, logicerr.Wrap(fmt.Errorf("(%T) decoding address string %q: %v", s, string(k), err))
|
return common.IterateRes{}, logicerr.Wrap(fmt.Errorf("(%T) decoding address string %q: %v", s, k, err))
|
||||||
}
|
}
|
||||||
var err error
|
var err error
|
||||||
if elem.ObjectData, err = s.compression.Decompress(elem.ObjectData); err != nil {
|
if elem.ObjectData, err = s.compression.Decompress(elem.ObjectData); err != nil {
|
||||||
|
|
|
@ -48,8 +48,8 @@ func (e *StorageEngine) ContainerSize(ctx context.Context, prm ContainerSizePrm)
|
||||||
defer elapsed("ContainerSize", e.metrics.AddMethodDuration)()
|
defer elapsed("ContainerSize", e.metrics.AddMethodDuration)()
|
||||||
|
|
||||||
err = e.execIfNotBlocked(func() error {
|
err = e.execIfNotBlocked(func() error {
|
||||||
res, err = e.containerSize(ctx, prm)
|
res = e.containerSize(ctx, prm)
|
||||||
return err
|
return nil
|
||||||
})
|
})
|
||||||
|
|
||||||
return
|
return
|
||||||
|
@ -69,7 +69,7 @@ func ContainerSize(ctx context.Context, e *StorageEngine, id cid.ID) (uint64, er
|
||||||
return res.Size(), nil
|
return res.Size(), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *StorageEngine) containerSize(ctx context.Context, prm ContainerSizePrm) (res ContainerSizeRes, err error) {
|
func (e *StorageEngine) containerSize(ctx context.Context, prm ContainerSizePrm) (res ContainerSizeRes) {
|
||||||
e.iterateOverUnsortedShards(func(sh hashedShard) (stop bool) {
|
e.iterateOverUnsortedShards(func(sh hashedShard) (stop bool) {
|
||||||
var csPrm shard.ContainerSizePrm
|
var csPrm shard.ContainerSizePrm
|
||||||
csPrm.SetContainerID(prm.cnr)
|
csPrm.SetContainerID(prm.cnr)
|
||||||
|
@ -96,8 +96,8 @@ func (e *StorageEngine) ListContainers(ctx context.Context, _ ListContainersPrm)
|
||||||
defer elapsed("ListContainers", e.metrics.AddMethodDuration)()
|
defer elapsed("ListContainers", e.metrics.AddMethodDuration)()
|
||||||
|
|
||||||
err = e.execIfNotBlocked(func() error {
|
err = e.execIfNotBlocked(func() error {
|
||||||
res, err = e.listContainers(ctx)
|
res = e.listContainers(ctx)
|
||||||
return err
|
return nil
|
||||||
})
|
})
|
||||||
|
|
||||||
return
|
return
|
||||||
|
@ -115,7 +115,7 @@ func ListContainers(ctx context.Context, e *StorageEngine) ([]cid.ID, error) {
|
||||||
return res.Containers(), nil
|
return res.Containers(), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *StorageEngine) listContainers(ctx context.Context) (ListContainersRes, error) {
|
func (e *StorageEngine) listContainers(ctx context.Context) ListContainersRes {
|
||||||
uniqueIDs := make(map[string]cid.ID)
|
uniqueIDs := make(map[string]cid.ID)
|
||||||
|
|
||||||
e.iterateOverUnsortedShards(func(sh hashedShard) (stop bool) {
|
e.iterateOverUnsortedShards(func(sh hashedShard) (stop bool) {
|
||||||
|
@ -142,5 +142,5 @@ func (e *StorageEngine) listContainers(ctx context.Context) (ListContainersRes,
|
||||||
|
|
||||||
return ListContainersRes{
|
return ListContainersRes{
|
||||||
containers: result,
|
containers: result,
|
||||||
}, nil
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -6,7 +6,6 @@ import (
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
|
||||||
tracingPkg "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/tracing"
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
|
||||||
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
||||||
|
@ -24,9 +23,6 @@ type DeletePrm struct {
|
||||||
forceRemoval bool
|
forceRemoval bool
|
||||||
}
|
}
|
||||||
|
|
||||||
// DeleteRes groups the resulting values of Delete operation.
|
|
||||||
type DeleteRes struct{}
|
|
||||||
|
|
||||||
// WithAddress is a Delete option to set the addresses of the objects to delete.
|
// WithAddress is a Delete option to set the addresses of the objects to delete.
|
||||||
//
|
//
|
||||||
// Option is required.
|
// Option is required.
|
||||||
|
@ -51,7 +47,7 @@ func (p *DeletePrm) WithForceRemoval() {
|
||||||
// NOTE: Marks any object to be deleted (despite any prohibitions
|
// NOTE: Marks any object to be deleted (despite any prohibitions
|
||||||
// on operations with that object) if WithForceRemoval option has
|
// on operations with that object) if WithForceRemoval option has
|
||||||
// been provided.
|
// been provided.
|
||||||
func (e *StorageEngine) Delete(ctx context.Context, prm DeletePrm) (res DeleteRes, err error) {
|
func (e *StorageEngine) Delete(ctx context.Context, prm DeletePrm) error {
|
||||||
ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.Delete",
|
ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.Delete",
|
||||||
trace.WithAttributes(
|
trace.WithAttributes(
|
||||||
attribute.String("address", prm.addr.EncodeToString()),
|
attribute.String("address", prm.addr.EncodeToString()),
|
||||||
|
@ -60,15 +56,12 @@ func (e *StorageEngine) Delete(ctx context.Context, prm DeletePrm) (res DeleteRe
|
||||||
defer span.End()
|
defer span.End()
|
||||||
defer elapsed("Delete", e.metrics.AddMethodDuration)()
|
defer elapsed("Delete", e.metrics.AddMethodDuration)()
|
||||||
|
|
||||||
err = e.execIfNotBlocked(func() error {
|
return e.execIfNotBlocked(func() error {
|
||||||
res, err = e.delete(ctx, prm)
|
return e.delete(ctx, prm)
|
||||||
return err
|
|
||||||
})
|
})
|
||||||
|
|
||||||
return
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *StorageEngine) delete(ctx context.Context, prm DeletePrm) (DeleteRes, error) {
|
func (e *StorageEngine) delete(ctx context.Context, prm DeletePrm) error {
|
||||||
var locked struct {
|
var locked struct {
|
||||||
is bool
|
is bool
|
||||||
}
|
}
|
||||||
|
@ -126,14 +119,14 @@ func (e *StorageEngine) delete(ctx context.Context, prm DeletePrm) (DeleteRes, e
|
||||||
})
|
})
|
||||||
|
|
||||||
if locked.is {
|
if locked.is {
|
||||||
return DeleteRes{}, new(apistatus.ObjectLocked)
|
return new(apistatus.ObjectLocked)
|
||||||
}
|
}
|
||||||
|
|
||||||
if splitInfo != nil {
|
if splitInfo != nil {
|
||||||
e.deleteChildren(ctx, prm.addr, prm.forceRemoval, splitInfo.SplitID())
|
e.deleteChildren(ctx, prm.addr, prm.forceRemoval, splitInfo.SplitID())
|
||||||
}
|
}
|
||||||
|
|
||||||
return DeleteRes{}, nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *StorageEngine) deleteChildren(ctx context.Context, addr oid.Address, force bool, splitID *objectSDK.SplitID) {
|
func (e *StorageEngine) deleteChildren(ctx context.Context, addr oid.Address, force bool, splitID *objectSDK.SplitID) {
|
||||||
|
@ -154,8 +147,7 @@ func (e *StorageEngine) deleteChildren(ctx context.Context, addr oid.Address, fo
|
||||||
if err != nil {
|
if err != nil {
|
||||||
e.log.Warn(ctx, logs.EngineErrorDuringSearchingForObjectChildren,
|
e.log.Warn(ctx, logs.EngineErrorDuringSearchingForObjectChildren,
|
||||||
zap.Stringer("addr", addr),
|
zap.Stringer("addr", addr),
|
||||||
zap.Error(err),
|
zap.Error(err))
|
||||||
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
|
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -166,8 +158,7 @@ func (e *StorageEngine) deleteChildren(ctx context.Context, addr oid.Address, fo
|
||||||
if err != nil {
|
if err != nil {
|
||||||
e.log.Debug(ctx, logs.EngineCouldNotInhumeObjectInShard,
|
e.log.Debug(ctx, logs.EngineCouldNotInhumeObjectInShard,
|
||||||
zap.Stringer("addr", addr),
|
zap.Stringer("addr", addr),
|
||||||
zap.Error(err),
|
zap.Error(err))
|
||||||
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
|
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -196,8 +187,7 @@ func (e *StorageEngine) deleteChunks(
|
||||||
if err != nil {
|
if err != nil {
|
||||||
e.log.Debug(ctx, logs.EngineCouldNotInhumeObjectInShard,
|
e.log.Debug(ctx, logs.EngineCouldNotInhumeObjectInShard,
|
||||||
zap.Stringer("addr", addr),
|
zap.Stringer("addr", addr),
|
||||||
zap.Error(err),
|
zap.Error(err))
|
||||||
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
|
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -70,8 +70,7 @@ func TestDeleteBigObject(t *testing.T) {
|
||||||
deletePrm.WithForceRemoval()
|
deletePrm.WithForceRemoval()
|
||||||
deletePrm.WithAddress(addrParent)
|
deletePrm.WithAddress(addrParent)
|
||||||
|
|
||||||
_, err := e.Delete(context.Background(), deletePrm)
|
require.NoError(t, e.Delete(context.Background(), deletePrm))
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
checkGetError[*apistatus.ObjectNotFound](t, e, addrParent, true)
|
checkGetError[*apistatus.ObjectNotFound](t, e, addrParent, true)
|
||||||
checkGetError[*apistatus.ObjectNotFound](t, e, addrLink, true)
|
checkGetError[*apistatus.ObjectNotFound](t, e, addrLink, true)
|
||||||
|
@ -141,8 +140,7 @@ func TestDeleteBigObjectWithoutGC(t *testing.T) {
|
||||||
deletePrm.WithForceRemoval()
|
deletePrm.WithForceRemoval()
|
||||||
deletePrm.WithAddress(addrParent)
|
deletePrm.WithAddress(addrParent)
|
||||||
|
|
||||||
_, err := e.Delete(context.Background(), deletePrm)
|
require.NoError(t, e.Delete(context.Background(), deletePrm))
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
checkGetError[*apistatus.ObjectNotFound](t, e, addrParent, true)
|
checkGetError[*apistatus.ObjectNotFound](t, e, addrParent, true)
|
||||||
checkGetError[*apistatus.ObjectNotFound](t, e, addrLink, true)
|
checkGetError[*apistatus.ObjectNotFound](t, e, addrLink, true)
|
||||||
|
@ -153,7 +151,7 @@ func TestDeleteBigObjectWithoutGC(t *testing.T) {
|
||||||
// delete physical
|
// delete physical
|
||||||
var delPrm shard.DeletePrm
|
var delPrm shard.DeletePrm
|
||||||
delPrm.SetAddresses(addrParent)
|
delPrm.SetAddresses(addrParent)
|
||||||
_, err = s1.Delete(context.Background(), delPrm)
|
_, err := s1.Delete(context.Background(), delPrm)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
delPrm.SetAddresses(addrLink)
|
delPrm.SetAddresses(addrLink)
|
||||||
|
|
|
@ -279,7 +279,7 @@ func (s *containerSource) IsContainerAvailable(ctx context.Context, id cid.ID) (
|
||||||
return true, nil
|
return true, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
wasRemoved, err := container.WasRemoved(s.cs, id)
|
wasRemoved, err := container.WasRemoved(ctx, s.cs, id)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return false, err
|
return false, err
|
||||||
}
|
}
|
||||||
|
|
|
@ -4,6 +4,7 @@ import (
|
||||||
"context"
|
"context"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"slices"
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
|
@ -14,7 +15,6 @@ import (
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/pilorama"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/pilorama"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
|
||||||
tracingPkg "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/tracing"
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
|
||||||
|
@ -255,8 +255,7 @@ func (e *StorageEngine) Evacuate(ctx context.Context, prm EvacuateShardPrm) erro
|
||||||
copyShards := func() []pooledShard {
|
copyShards := func() []pooledShard {
|
||||||
mtx.RLock()
|
mtx.RLock()
|
||||||
defer mtx.RUnlock()
|
defer mtx.RUnlock()
|
||||||
t := make([]pooledShard, len(shards))
|
t := slices.Clone(shards)
|
||||||
copy(t, shards)
|
|
||||||
return t
|
return t
|
||||||
}
|
}
|
||||||
eg.Go(func() error {
|
eg.Go(func() error {
|
||||||
|
@ -284,12 +283,12 @@ func (e *StorageEngine) evacuateShards(ctx context.Context, shardIDs []string, p
|
||||||
}()
|
}()
|
||||||
|
|
||||||
e.log.Info(ctx, logs.EngineStartedShardsEvacuation, zap.Strings("shard_ids", shardIDs), evacuationOperationLogField,
|
e.log.Info(ctx, logs.EngineStartedShardsEvacuation, zap.Strings("shard_ids", shardIDs), evacuationOperationLogField,
|
||||||
zap.String("trace_id", tracingPkg.GetTraceID(ctx)), zap.Stringer("scope", prm.Scope))
|
zap.Stringer("scope", prm.Scope))
|
||||||
|
|
||||||
err = e.getTotals(ctx, prm, shardsToEvacuate, res)
|
err = e.getTotals(ctx, prm, shardsToEvacuate, res)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
e.log.Error(ctx, logs.EngineShardsEvacuationFailedToCount, zap.Strings("shard_ids", shardIDs), zap.Error(err), evacuationOperationLogField,
|
e.log.Error(ctx, logs.EngineShardsEvacuationFailedToCount, zap.Strings("shard_ids", shardIDs), zap.Error(err), evacuationOperationLogField,
|
||||||
zap.String("trace_id", tracingPkg.GetTraceID(ctx)), zap.Stringer("scope", prm.Scope))
|
zap.Stringer("scope", prm.Scope))
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -323,7 +322,7 @@ func (e *StorageEngine) evacuateShards(ctx context.Context, shardIDs []string, p
|
||||||
}
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
e.log.Error(ctx, logs.EngineFinishedWithErrorShardsEvacuation, zap.Error(err), zap.Strings("shard_ids", shardIDs), evacuationOperationLogField,
|
e.log.Error(ctx, logs.EngineFinishedWithErrorShardsEvacuation, zap.Error(err), zap.Strings("shard_ids", shardIDs), evacuationOperationLogField,
|
||||||
zap.String("trace_id", tracingPkg.GetTraceID(ctx)), zap.Stringer("scope", prm.Scope))
|
zap.Stringer("scope", prm.Scope))
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -426,7 +425,7 @@ func (e *StorageEngine) evacuateShardObjects(ctx context.Context, cancel context
|
||||||
}
|
}
|
||||||
egContainer.Go(func() error {
|
egContainer.Go(func() error {
|
||||||
var skip bool
|
var skip bool
|
||||||
c, err := e.containerSource.Load().cs.Get(cnt)
|
c, err := e.containerSource.Load().cs.Get(ctx, cnt)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if client.IsErrContainerNotFound(err) {
|
if client.IsErrContainerNotFound(err) {
|
||||||
skip = true
|
skip = true
|
||||||
|
@ -480,8 +479,7 @@ func (e *StorageEngine) evacuateShardObjects(ctx context.Context, cancel context
|
||||||
err := sh.IterateOverContainers(ctx, cntPrm)
|
err := sh.IterateOverContainers(ctx, cntPrm)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
cancel(err)
|
cancel(err)
|
||||||
e.log.Error(ctx, logs.EngineShardsEvacuationFailedToListObjects, zap.String("shard_id", shardID), zap.Error(err), evacuationOperationLogField,
|
e.log.Error(ctx, logs.EngineShardsEvacuationFailedToListObjects, zap.String("shard_id", shardID), zap.Error(err), evacuationOperationLogField)
|
||||||
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
|
|
||||||
}
|
}
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -540,7 +538,7 @@ func (e *StorageEngine) evacuateTrees(ctx context.Context, sh *shard.Shard, tree
|
||||||
e.log.Debug(ctx, logs.EngineShardsEvacuationTreeEvacuatedLocal,
|
e.log.Debug(ctx, logs.EngineShardsEvacuationTreeEvacuatedLocal,
|
||||||
zap.String("cid", contTree.CID.EncodeToString()), zap.String("tree_id", contTree.TreeID),
|
zap.String("cid", contTree.CID.EncodeToString()), zap.String("tree_id", contTree.TreeID),
|
||||||
zap.String("from_shard_id", sh.ID().String()), zap.String("to_shard_id", shardID),
|
zap.String("from_shard_id", sh.ID().String()), zap.String("to_shard_id", shardID),
|
||||||
evacuationOperationLogField, zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
|
evacuationOperationLogField)
|
||||||
res.trEvacuated.Add(1)
|
res.trEvacuated.Add(1)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
@ -550,26 +548,26 @@ func (e *StorageEngine) evacuateTrees(ctx context.Context, sh *shard.Shard, tree
|
||||||
e.log.Error(ctx, logs.EngineShardsEvacuationFailedToMoveTree,
|
e.log.Error(ctx, logs.EngineShardsEvacuationFailedToMoveTree,
|
||||||
zap.String("cid", contTree.CID.EncodeToString()), zap.String("tree_id", contTree.TreeID),
|
zap.String("cid", contTree.CID.EncodeToString()), zap.String("tree_id", contTree.TreeID),
|
||||||
zap.String("from_shard_id", sh.ID().String()), evacuationOperationLogField,
|
zap.String("from_shard_id", sh.ID().String()), evacuationOperationLogField,
|
||||||
zap.Error(err), zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
|
zap.Error(err))
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if moved {
|
if moved {
|
||||||
e.log.Debug(ctx, logs.EngineShardsEvacuationTreeEvacuatedRemote,
|
e.log.Debug(ctx, logs.EngineShardsEvacuationTreeEvacuatedRemote,
|
||||||
zap.String("cid", contTree.CID.EncodeToString()), zap.String("treeID", contTree.TreeID),
|
zap.String("cid", contTree.CID.EncodeToString()), zap.String("treeID", contTree.TreeID),
|
||||||
zap.String("from_shardID", sh.ID().String()), zap.String("to_node", nodePK),
|
zap.String("from_shardID", sh.ID().String()), zap.String("to_node", nodePK),
|
||||||
evacuationOperationLogField, zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
|
evacuationOperationLogField)
|
||||||
res.trEvacuated.Add(1)
|
res.trEvacuated.Add(1)
|
||||||
} else if prm.IgnoreErrors {
|
} else if prm.IgnoreErrors {
|
||||||
res.trFailed.Add(1)
|
res.trFailed.Add(1)
|
||||||
e.log.Warn(ctx, logs.EngineShardsEvacuationFailedToMoveTree,
|
e.log.Warn(ctx, logs.EngineShardsEvacuationFailedToMoveTree,
|
||||||
zap.String("cid", contTree.CID.EncodeToString()), zap.String("tree_id", contTree.TreeID),
|
zap.String("cid", contTree.CID.EncodeToString()), zap.String("tree_id", contTree.TreeID),
|
||||||
zap.String("from_shard_id", sh.ID().String()), evacuationOperationLogField,
|
zap.String("from_shard_id", sh.ID().String()), evacuationOperationLogField,
|
||||||
zap.Error(err), zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
|
zap.Error(err))
|
||||||
} else {
|
} else {
|
||||||
e.log.Error(ctx, logs.EngineShardsEvacuationFailedToMoveTree,
|
e.log.Error(ctx, logs.EngineShardsEvacuationFailedToMoveTree,
|
||||||
zap.String("cid", contTree.CID.EncodeToString()), zap.String("tree_id", contTree.TreeID),
|
zap.String("cid", contTree.CID.EncodeToString()), zap.String("tree_id", contTree.TreeID),
|
||||||
zap.String("from_shard_id", sh.ID().String()), evacuationOperationLogField,
|
zap.String("from_shard_id", sh.ID().String()), evacuationOperationLogField,
|
||||||
zap.Error(err), zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
|
zap.Error(err))
|
||||||
return fmt.Errorf("no remote nodes available to replicate tree '%s' of container %s", contTree.TreeID, contTree.CID)
|
return fmt.Errorf("no remote nodes available to replicate tree '%s' of container %s", contTree.TreeID, contTree.CID)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -724,7 +722,7 @@ func (e *StorageEngine) getActualShards(shardIDs []string, prm EvacuateShardPrm)
|
||||||
shards := make([]pooledShard, 0, len(e.shards))
|
shards := make([]pooledShard, 0, len(e.shards))
|
||||||
for id := range e.shards {
|
for id := range e.shards {
|
||||||
shards = append(shards, pooledShard{
|
shards = append(shards, pooledShard{
|
||||||
hashedShard: hashedShard(e.shards[id]),
|
hashedShard: e.shards[id],
|
||||||
pool: e.shardPools[id],
|
pool: e.shardPools[id],
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
@ -756,8 +754,7 @@ func (e *StorageEngine) evacuateObject(ctx context.Context, shardID string, objI
|
||||||
res.objFailed.Add(1)
|
res.objFailed.Add(1)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
e.log.Error(ctx, logs.EngineShardsEvacuationFailedToReadObject, zap.String("address", addr.EncodeToString()), zap.Error(err), evacuationOperationLogField,
|
e.log.Error(ctx, logs.EngineShardsEvacuationFailedToReadObject, zap.String("address", addr.EncodeToString()), zap.Error(err), evacuationOperationLogField)
|
||||||
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -778,16 +775,14 @@ func (e *StorageEngine) evacuateObject(ctx context.Context, shardID string, objI
|
||||||
|
|
||||||
moved, err := prm.ObjectsHandler(ctx, addr, getRes.Object())
|
moved, err := prm.ObjectsHandler(ctx, addr, getRes.Object())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
e.log.Error(ctx, logs.EngineShardsEvacuationFailedToMoveObject, zap.String("address", addr.EncodeToString()), zap.Error(err), evacuationOperationLogField,
|
e.log.Error(ctx, logs.EngineShardsEvacuationFailedToMoveObject, zap.String("address", addr.EncodeToString()), zap.Error(err), evacuationOperationLogField)
|
||||||
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if moved {
|
if moved {
|
||||||
res.objEvacuated.Add(1)
|
res.objEvacuated.Add(1)
|
||||||
} else if prm.IgnoreErrors {
|
} else if prm.IgnoreErrors {
|
||||||
res.objFailed.Add(1)
|
res.objFailed.Add(1)
|
||||||
e.log.Warn(ctx, logs.EngineShardsEvacuationFailedToMoveObject, zap.String("address", addr.EncodeToString()), zap.Error(err), evacuationOperationLogField,
|
e.log.Warn(ctx, logs.EngineShardsEvacuationFailedToMoveObject, zap.String("address", addr.EncodeToString()), zap.Error(err), evacuationOperationLogField)
|
||||||
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
|
|
||||||
} else {
|
} else {
|
||||||
return fmt.Errorf("object %s was not replicated", addr)
|
return fmt.Errorf("object %s was not replicated", addr)
|
||||||
}
|
}
|
||||||
|
@ -825,8 +820,7 @@ func (e *StorageEngine) tryEvacuateObjectLocal(ctx context.Context, addr oid.Add
|
||||||
zap.Stringer("from", sh.ID()),
|
zap.Stringer("from", sh.ID()),
|
||||||
zap.Stringer("to", shards[j].ID()),
|
zap.Stringer("to", shards[j].ID()),
|
||||||
zap.Stringer("addr", addr),
|
zap.Stringer("addr", addr),
|
||||||
evacuationOperationLogField,
|
evacuationOperationLogField)
|
||||||
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
|
|
||||||
return true, nil
|
return true, nil
|
||||||
case putToShardExists, putToShardRemoved:
|
case putToShardExists, putToShardRemoved:
|
||||||
res.objSkipped.Add(1)
|
res.objSkipped.Add(1)
|
||||||
|
|
|
@ -3,6 +3,7 @@ package engine
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"slices"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
@ -123,8 +124,7 @@ func (s *EvacuationState) DeepCopy() *EvacuationState {
|
||||||
if s == nil {
|
if s == nil {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
shardIDs := make([]string, len(s.shardIDs))
|
shardIDs := slices.Clone(s.shardIDs)
|
||||||
copy(shardIDs, s.shardIDs)
|
|
||||||
|
|
||||||
return &EvacuationState{
|
return &EvacuationState{
|
||||||
shardIDs: shardIDs,
|
shardIDs: shardIDs,
|
||||||
|
|
|
@ -37,7 +37,7 @@ type containerStorage struct {
|
||||||
latency time.Duration
|
latency time.Duration
|
||||||
}
|
}
|
||||||
|
|
||||||
func (cs *containerStorage) Get(id cid.ID) (*coreContainer.Container, error) {
|
func (cs *containerStorage) Get(ctx context.Context, id cid.ID) (*coreContainer.Container, error) {
|
||||||
time.Sleep(cs.latency)
|
time.Sleep(cs.latency)
|
||||||
v, ok := cs.cntmap[id]
|
v, ok := cs.cntmap[id]
|
||||||
if !ok {
|
if !ok {
|
||||||
|
@ -49,7 +49,7 @@ func (cs *containerStorage) Get(id cid.ID) (*coreContainer.Container, error) {
|
||||||
return &coreCnt, nil
|
return &coreCnt, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (cs *containerStorage) DeletionInfo(cid.ID) (*coreContainer.DelInfo, error) {
|
func (cs *containerStorage) DeletionInfo(context.Context, cid.ID) (*coreContainer.DelInfo, error) {
|
||||||
return nil, nil
|
return nil, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -8,7 +8,6 @@ import (
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
|
||||||
tracingPkg "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/tracing"
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
|
||||||
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
||||||
|
@ -107,8 +106,7 @@ func (e *StorageEngine) get(ctx context.Context, prm GetPrm) (GetRes, error) {
|
||||||
e.log.Warn(ctx, logs.ShardMetaInfoPresentButObjectNotFound,
|
e.log.Warn(ctx, logs.ShardMetaInfoPresentButObjectNotFound,
|
||||||
zap.Stringer("shard_id", it.ShardWithMeta.ID()),
|
zap.Stringer("shard_id", it.ShardWithMeta.ID()),
|
||||||
zap.Error(it.MetaError),
|
zap.Error(it.MetaError),
|
||||||
zap.Stringer("address", prm.addr),
|
zap.Stringer("address", prm.addr))
|
||||||
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -7,7 +7,6 @@ import (
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||||
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
|
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
|
||||||
tracingPkg "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/tracing"
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
|
||||||
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
||||||
|
@ -27,9 +26,6 @@ type InhumePrm struct {
|
||||||
forceRemoval bool
|
forceRemoval bool
|
||||||
}
|
}
|
||||||
|
|
||||||
// InhumeRes encapsulates results of inhume operation.
|
|
||||||
type InhumeRes struct{}
|
|
||||||
|
|
||||||
// WithTarget sets a list of objects that should be inhumed and tombstone address
|
// WithTarget sets a list of objects that should be inhumed and tombstone address
|
||||||
// as the reason for inhume operation.
|
// as the reason for inhume operation.
|
||||||
//
|
//
|
||||||
|
@ -67,23 +63,20 @@ var errInhumeFailure = errors.New("inhume operation failed")
|
||||||
// with that object) if WithForceRemoval option has been provided.
|
// with that object) if WithForceRemoval option has been provided.
|
||||||
//
|
//
|
||||||
// Returns an error if executions are blocked (see BlockExecution).
|
// Returns an error if executions are blocked (see BlockExecution).
|
||||||
func (e *StorageEngine) Inhume(ctx context.Context, prm InhumePrm) (res InhumeRes, err error) {
|
func (e *StorageEngine) Inhume(ctx context.Context, prm InhumePrm) error {
|
||||||
ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.Inhume")
|
ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.Inhume")
|
||||||
defer span.End()
|
defer span.End()
|
||||||
defer elapsed("Inhume", e.metrics.AddMethodDuration)()
|
defer elapsed("Inhume", e.metrics.AddMethodDuration)()
|
||||||
|
|
||||||
err = e.execIfNotBlocked(func() error {
|
return e.execIfNotBlocked(func() error {
|
||||||
res, err = e.inhume(ctx, prm)
|
return e.inhume(ctx, prm)
|
||||||
return err
|
|
||||||
})
|
})
|
||||||
|
|
||||||
return
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *StorageEngine) inhume(ctx context.Context, prm InhumePrm) (InhumeRes, error) {
|
func (e *StorageEngine) inhume(ctx context.Context, prm InhumePrm) error {
|
||||||
addrsPerShard, err := e.groupObjectsByShard(ctx, prm.addrs, !prm.forceRemoval)
|
addrsPerShard, err := e.groupObjectsByShard(ctx, prm.addrs, !prm.forceRemoval)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return InhumeRes{}, err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
var shPrm shard.InhumePrm
|
var shPrm shard.InhumePrm
|
||||||
|
@ -105,9 +98,8 @@ func (e *StorageEngine) inhume(ctx context.Context, prm InhumePrm) (InhumeRes, e
|
||||||
e.log.Warn(ctx, logs.EngineCouldNotInhumeObjectInShard,
|
e.log.Warn(ctx, logs.EngineCouldNotInhumeObjectInShard,
|
||||||
zap.Error(errors.New("this shard was expected to exist")),
|
zap.Error(errors.New("this shard was expected to exist")),
|
||||||
zap.String("shard_id", shardID),
|
zap.String("shard_id", shardID),
|
||||||
zap.String("trace_id", tracingPkg.GetTraceID(ctx)),
|
|
||||||
)
|
)
|
||||||
return InhumeRes{}, errInhumeFailure
|
return errInhumeFailure
|
||||||
}
|
}
|
||||||
|
|
||||||
if _, err := sh.Inhume(ctx, shPrm); err != nil {
|
if _, err := sh.Inhume(ctx, shPrm); err != nil {
|
||||||
|
@ -119,11 +111,11 @@ func (e *StorageEngine) inhume(ctx context.Context, prm InhumePrm) (InhumeRes, e
|
||||||
default:
|
default:
|
||||||
e.reportShardError(ctx, sh, "couldn't inhume object in shard", err)
|
e.reportShardError(ctx, sh, "couldn't inhume object in shard", err)
|
||||||
}
|
}
|
||||||
return InhumeRes{}, err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return InhumeRes{}, nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// groupObjectsByShard groups objects based on the shard(s) they are stored on.
|
// groupObjectsByShard groups objects based on the shard(s) they are stored on.
|
||||||
|
@ -203,7 +195,6 @@ func (e *StorageEngine) findShards(ctx context.Context, addr oid.Address, checkL
|
||||||
e.log.Warn(ctx, logs.EngineRemovingAnObjectWithoutFullLockingCheck,
|
e.log.Warn(ctx, logs.EngineRemovingAnObjectWithoutFullLockingCheck,
|
||||||
zap.Error(err),
|
zap.Error(err),
|
||||||
zap.Stringer("address", addr),
|
zap.Stringer("address", addr),
|
||||||
zap.String("trace_id", tracingPkg.GetTraceID(ctx)),
|
|
||||||
)
|
)
|
||||||
} else if isLocked {
|
} else if isLocked {
|
||||||
retErr = new(apistatus.ObjectLocked)
|
retErr = new(apistatus.ObjectLocked)
|
||||||
|
@ -238,8 +229,7 @@ func (e *StorageEngine) IsLocked(ctx context.Context, addr oid.Address) (bool, e
|
||||||
e.iterateOverUnsortedShards(func(h hashedShard) (stop bool) {
|
e.iterateOverUnsortedShards(func(h hashedShard) (stop bool) {
|
||||||
locked, err = h.Shard.IsLocked(ctx, addr)
|
locked, err = h.Shard.IsLocked(ctx, addr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
e.reportShardError(ctx, h, "can't check object's lockers", err, zap.Stringer("address", addr),
|
e.reportShardError(ctx, h, "can't check object's lockers", err, zap.Stringer("address", addr))
|
||||||
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
|
|
||||||
outErr = err
|
outErr = err
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
@ -268,8 +258,7 @@ func (e *StorageEngine) GetLocks(ctx context.Context, addr oid.Address) ([]oid.I
|
||||||
e.iterateOverUnsortedShards(func(h hashedShard) (stop bool) {
|
e.iterateOverUnsortedShards(func(h hashedShard) (stop bool) {
|
||||||
locks, err := h.Shard.GetLocks(ctx, addr)
|
locks, err := h.Shard.GetLocks(ctx, addr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
e.reportShardError(ctx, h, logs.EngineInterruptGettingLockers, err, zap.Stringer("address", addr),
|
e.reportShardError(ctx, h, logs.EngineInterruptGettingLockers, err, zap.Stringer("address", addr))
|
||||||
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
|
|
||||||
outErr = err
|
outErr = err
|
||||||
}
|
}
|
||||||
allLocks = append(allLocks, locks...)
|
allLocks = append(allLocks, locks...)
|
||||||
|
|
|
@ -55,7 +55,7 @@ func TestStorageEngine_Inhume(t *testing.T) {
|
||||||
var inhumePrm InhumePrm
|
var inhumePrm InhumePrm
|
||||||
inhumePrm.WithTarget(tombstoneID, object.AddressOf(parent))
|
inhumePrm.WithTarget(tombstoneID, object.AddressOf(parent))
|
||||||
|
|
||||||
_, err = e.Inhume(context.Background(), inhumePrm)
|
err = e.Inhume(context.Background(), inhumePrm)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
addrs, err := Select(context.Background(), e, cnr, false, fs)
|
addrs, err := Select(context.Background(), e, cnr, false, fs)
|
||||||
|
@ -85,7 +85,7 @@ func TestStorageEngine_Inhume(t *testing.T) {
|
||||||
var inhumePrm InhumePrm
|
var inhumePrm InhumePrm
|
||||||
inhumePrm.WithTarget(tombstoneID, object.AddressOf(parent))
|
inhumePrm.WithTarget(tombstoneID, object.AddressOf(parent))
|
||||||
|
|
||||||
_, err = e.Inhume(context.Background(), inhumePrm)
|
err = e.Inhume(context.Background(), inhumePrm)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
addrs, err := Select(context.Background(), e, cnr, false, fs)
|
addrs, err := Select(context.Background(), e, cnr, false, fs)
|
||||||
|
@ -128,7 +128,7 @@ func TestStorageEngine_ECInhume(t *testing.T) {
|
||||||
|
|
||||||
var inhumePrm InhumePrm
|
var inhumePrm InhumePrm
|
||||||
inhumePrm.WithTarget(tombstoneObjectAddress, parentObjectAddress)
|
inhumePrm.WithTarget(tombstoneObjectAddress, parentObjectAddress)
|
||||||
_, err = e.Inhume(context.Background(), inhumePrm)
|
err = e.Inhume(context.Background(), inhumePrm)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
var alreadyRemoved *apistatus.ObjectAlreadyRemoved
|
var alreadyRemoved *apistatus.ObjectAlreadyRemoved
|
||||||
|
@ -173,7 +173,7 @@ func TestInhumeExpiredRegularObject(t *testing.T) {
|
||||||
|
|
||||||
var prm InhumePrm
|
var prm InhumePrm
|
||||||
prm.WithTarget(ts, object.AddressOf(obj))
|
prm.WithTarget(ts, object.AddressOf(obj))
|
||||||
_, err := engine.Inhume(context.Background(), prm)
|
err := engine.Inhume(context.Background(), prm)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
})
|
})
|
||||||
|
|
||||||
|
@ -182,7 +182,7 @@ func TestInhumeExpiredRegularObject(t *testing.T) {
|
||||||
|
|
||||||
var prm InhumePrm
|
var prm InhumePrm
|
||||||
prm.MarkAsGarbage(object.AddressOf(obj))
|
prm.MarkAsGarbage(object.AddressOf(obj))
|
||||||
_, err := engine.Inhume(context.Background(), prm)
|
err := engine.Inhume(context.Background(), prm)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
@ -237,7 +237,7 @@ func benchmarkInhumeMultipart(b *testing.B, numShards, numObjects int) {
|
||||||
prm.WithTarget(ts, addrs...)
|
prm.WithTarget(ts, addrs...)
|
||||||
|
|
||||||
b.StartTimer()
|
b.StartTimer()
|
||||||
_, err := engine.Inhume(context.Background(), prm)
|
err := engine.Inhume(context.Background(), prm)
|
||||||
require.NoError(b, err)
|
require.NoError(b, err)
|
||||||
b.StopTimer()
|
b.StopTimer()
|
||||||
}
|
}
|
||||||
|
|
|
@ -114,7 +114,7 @@ func TestLockUserScenario(t *testing.T) {
|
||||||
inhumePrm.WithTarget(tombAddr, objAddr)
|
inhumePrm.WithTarget(tombAddr, objAddr)
|
||||||
|
|
||||||
var objLockedErr *apistatus.ObjectLocked
|
var objLockedErr *apistatus.ObjectLocked
|
||||||
_, err = e.Inhume(context.Background(), inhumePrm)
|
err = e.Inhume(context.Background(), inhumePrm)
|
||||||
require.ErrorAs(t, err, &objLockedErr)
|
require.ErrorAs(t, err, &objLockedErr)
|
||||||
|
|
||||||
// 4.
|
// 4.
|
||||||
|
@ -127,7 +127,7 @@ func TestLockUserScenario(t *testing.T) {
|
||||||
|
|
||||||
inhumePrm.WithTarget(tombForLockAddr, lockerAddr)
|
inhumePrm.WithTarget(tombForLockAddr, lockerAddr)
|
||||||
|
|
||||||
_, err = e.Inhume(context.Background(), inhumePrm)
|
err = e.Inhume(context.Background(), inhumePrm)
|
||||||
require.ErrorIs(t, err, meta.ErrLockObjectRemoval)
|
require.ErrorIs(t, err, meta.ErrLockObjectRemoval)
|
||||||
|
|
||||||
// 5.
|
// 5.
|
||||||
|
@ -136,7 +136,7 @@ func TestLockUserScenario(t *testing.T) {
|
||||||
inhumePrm.WithTarget(tombAddr, objAddr)
|
inhumePrm.WithTarget(tombAddr, objAddr)
|
||||||
|
|
||||||
require.Eventually(t, func() bool {
|
require.Eventually(t, func() bool {
|
||||||
_, err = e.Inhume(context.Background(), inhumePrm)
|
err = e.Inhume(context.Background(), inhumePrm)
|
||||||
return err == nil
|
return err == nil
|
||||||
}, 30*time.Second, time.Second)
|
}, 30*time.Second, time.Second)
|
||||||
}
|
}
|
||||||
|
@ -200,7 +200,7 @@ func TestLockExpiration(t *testing.T) {
|
||||||
inhumePrm.WithTarget(tombAddr, objectcore.AddressOf(obj))
|
inhumePrm.WithTarget(tombAddr, objectcore.AddressOf(obj))
|
||||||
|
|
||||||
var objLockedErr *apistatus.ObjectLocked
|
var objLockedErr *apistatus.ObjectLocked
|
||||||
_, err = e.Inhume(context.Background(), inhumePrm)
|
err = e.Inhume(context.Background(), inhumePrm)
|
||||||
require.ErrorAs(t, err, &objLockedErr)
|
require.ErrorAs(t, err, &objLockedErr)
|
||||||
|
|
||||||
// 3.
|
// 3.
|
||||||
|
@ -212,7 +212,7 @@ func TestLockExpiration(t *testing.T) {
|
||||||
inhumePrm.WithTarget(tombAddr, objectcore.AddressOf(obj))
|
inhumePrm.WithTarget(tombAddr, objectcore.AddressOf(obj))
|
||||||
|
|
||||||
require.Eventually(t, func() bool {
|
require.Eventually(t, func() bool {
|
||||||
_, err = e.Inhume(context.Background(), inhumePrm)
|
err = e.Inhume(context.Background(), inhumePrm)
|
||||||
return err == nil
|
return err == nil
|
||||||
}, 30*time.Second, time.Second)
|
}, 30*time.Second, time.Second)
|
||||||
}
|
}
|
||||||
|
@ -270,12 +270,12 @@ func TestLockForceRemoval(t *testing.T) {
|
||||||
inhumePrm.MarkAsGarbage(objectcore.AddressOf(obj))
|
inhumePrm.MarkAsGarbage(objectcore.AddressOf(obj))
|
||||||
|
|
||||||
var objLockedErr *apistatus.ObjectLocked
|
var objLockedErr *apistatus.ObjectLocked
|
||||||
_, err = e.Inhume(context.Background(), inhumePrm)
|
err = e.Inhume(context.Background(), inhumePrm)
|
||||||
require.ErrorAs(t, err, &objLockedErr)
|
require.ErrorAs(t, err, &objLockedErr)
|
||||||
|
|
||||||
inhumePrm.WithTarget(oidtest.Address(), objectcore.AddressOf(obj))
|
inhumePrm.WithTarget(oidtest.Address(), objectcore.AddressOf(obj))
|
||||||
|
|
||||||
_, err = e.Inhume(context.Background(), inhumePrm)
|
err = e.Inhume(context.Background(), inhumePrm)
|
||||||
require.ErrorAs(t, err, &objLockedErr)
|
require.ErrorAs(t, err, &objLockedErr)
|
||||||
|
|
||||||
// 4.
|
// 4.
|
||||||
|
@ -283,13 +283,12 @@ func TestLockForceRemoval(t *testing.T) {
|
||||||
deletePrm.WithAddress(objectcore.AddressOf(lock))
|
deletePrm.WithAddress(objectcore.AddressOf(lock))
|
||||||
deletePrm.WithForceRemoval()
|
deletePrm.WithForceRemoval()
|
||||||
|
|
||||||
_, err = e.Delete(context.Background(), deletePrm)
|
require.NoError(t, e.Delete(context.Background(), deletePrm))
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
// 5.
|
// 5.
|
||||||
inhumePrm.MarkAsGarbage(objectcore.AddressOf(obj))
|
inhumePrm.MarkAsGarbage(objectcore.AddressOf(obj))
|
||||||
|
|
||||||
_, err = e.Inhume(context.Background(), inhumePrm)
|
err = e.Inhume(context.Background(), inhumePrm)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -9,7 +9,6 @@ import (
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
|
||||||
tracingPkg "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/tracing"
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
|
||||||
|
@ -143,8 +142,7 @@ func (e *StorageEngine) putToShard(ctx context.Context, sh hashedShard, pool uti
|
||||||
} else {
|
} else {
|
||||||
e.log.Warn(ctx, logs.EngineCouldNotCheckObjectExistence,
|
e.log.Warn(ctx, logs.EngineCouldNotCheckObjectExistence,
|
||||||
zap.Stringer("shard_id", sh.ID()),
|
zap.Stringer("shard_id", sh.ID()),
|
||||||
zap.Error(err),
|
zap.Error(err))
|
||||||
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return // this is not ErrAlreadyRemoved error so we can go to the next shard
|
return // this is not ErrAlreadyRemoved error so we can go to the next shard
|
||||||
|
@ -165,15 +163,13 @@ func (e *StorageEngine) putToShard(ctx context.Context, sh hashedShard, pool uti
|
||||||
errors.Is(err, common.ErrReadOnly) || errors.Is(err, common.ErrNoSpace) {
|
errors.Is(err, common.ErrReadOnly) || errors.Is(err, common.ErrNoSpace) {
|
||||||
e.log.Warn(ctx, logs.EngineCouldNotPutObjectToShard,
|
e.log.Warn(ctx, logs.EngineCouldNotPutObjectToShard,
|
||||||
zap.Stringer("shard_id", sh.ID()),
|
zap.Stringer("shard_id", sh.ID()),
|
||||||
zap.Error(err),
|
zap.Error(err))
|
||||||
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
if client.IsErrObjectAlreadyRemoved(err) {
|
if client.IsErrObjectAlreadyRemoved(err) {
|
||||||
e.log.Warn(ctx, logs.EngineCouldNotPutObjectToShard,
|
e.log.Warn(ctx, logs.EngineCouldNotPutObjectToShard,
|
||||||
zap.Stringer("shard_id", sh.ID()),
|
zap.Stringer("shard_id", sh.ID()),
|
||||||
zap.Error(err),
|
zap.Error(err))
|
||||||
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
|
|
||||||
res.status = putToShardRemoved
|
res.status = putToShardRemoved
|
||||||
res.err = err
|
res.err = err
|
||||||
return
|
return
|
||||||
|
|
|
@ -9,7 +9,6 @@ import (
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
|
||||||
tracingPkg "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/tracing"
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
|
||||||
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
||||||
|
@ -119,8 +118,7 @@ func (e *StorageEngine) getRange(ctx context.Context, prm RngPrm) (RngRes, error
|
||||||
e.log.Warn(ctx, logs.ShardMetaInfoPresentButObjectNotFound,
|
e.log.Warn(ctx, logs.ShardMetaInfoPresentButObjectNotFound,
|
||||||
zap.Stringer("shard_id", it.ShardWithMeta.ID()),
|
zap.Stringer("shard_id", it.ShardWithMeta.ID()),
|
||||||
zap.Error(it.MetaError),
|
zap.Error(it.MetaError),
|
||||||
zap.Stringer("address", prm.addr),
|
zap.Stringer("address", prm.addr))
|
||||||
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -54,19 +54,17 @@ func (e *StorageEngine) Select(ctx context.Context, prm SelectPrm) (res SelectRe
|
||||||
defer elapsed("Select", e.metrics.AddMethodDuration)()
|
defer elapsed("Select", e.metrics.AddMethodDuration)()
|
||||||
|
|
||||||
err = e.execIfNotBlocked(func() error {
|
err = e.execIfNotBlocked(func() error {
|
||||||
res, err = e._select(ctx, prm)
|
res = e._select(ctx, prm)
|
||||||
return err
|
return nil
|
||||||
})
|
})
|
||||||
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *StorageEngine) _select(ctx context.Context, prm SelectPrm) (SelectRes, error) {
|
func (e *StorageEngine) _select(ctx context.Context, prm SelectPrm) SelectRes {
|
||||||
addrList := make([]oid.Address, 0)
|
addrList := make([]oid.Address, 0)
|
||||||
uniqueMap := make(map[string]struct{})
|
uniqueMap := make(map[string]struct{})
|
||||||
|
|
||||||
var outError error
|
|
||||||
|
|
||||||
var shPrm shard.SelectPrm
|
var shPrm shard.SelectPrm
|
||||||
shPrm.SetContainerID(prm.cnr, prm.indexedContainer)
|
shPrm.SetContainerID(prm.cnr, prm.indexedContainer)
|
||||||
shPrm.SetFilters(prm.filters)
|
shPrm.SetFilters(prm.filters)
|
||||||
|
@ -90,7 +88,7 @@ func (e *StorageEngine) _select(ctx context.Context, prm SelectPrm) (SelectRes,
|
||||||
|
|
||||||
return SelectRes{
|
return SelectRes{
|
||||||
addrList: addrList,
|
addrList: addrList,
|
||||||
}, outError
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// List returns `limit` available physically storage object addresses in engine.
|
// List returns `limit` available physically storage object addresses in engine.
|
||||||
|
@ -100,14 +98,14 @@ func (e *StorageEngine) _select(ctx context.Context, prm SelectPrm) (SelectRes,
|
||||||
func (e *StorageEngine) List(ctx context.Context, limit uint64) (res SelectRes, err error) {
|
func (e *StorageEngine) List(ctx context.Context, limit uint64) (res SelectRes, err error) {
|
||||||
defer elapsed("List", e.metrics.AddMethodDuration)()
|
defer elapsed("List", e.metrics.AddMethodDuration)()
|
||||||
err = e.execIfNotBlocked(func() error {
|
err = e.execIfNotBlocked(func() error {
|
||||||
res, err = e.list(ctx, limit)
|
res = e.list(ctx, limit)
|
||||||
return err
|
return nil
|
||||||
})
|
})
|
||||||
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *StorageEngine) list(ctx context.Context, limit uint64) (SelectRes, error) {
|
func (e *StorageEngine) list(ctx context.Context, limit uint64) SelectRes {
|
||||||
addrList := make([]oid.Address, 0, limit)
|
addrList := make([]oid.Address, 0, limit)
|
||||||
uniqueMap := make(map[string]struct{})
|
uniqueMap := make(map[string]struct{})
|
||||||
ln := uint64(0)
|
ln := uint64(0)
|
||||||
|
@ -136,7 +134,7 @@ func (e *StorageEngine) list(ctx context.Context, limit uint64) (SelectRes, erro
|
||||||
|
|
||||||
return SelectRes{
|
return SelectRes{
|
||||||
addrList: addrList,
|
addrList: addrList,
|
||||||
}, nil
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Select selects objects from local storage using provided filters.
|
// Select selects objects from local storage using provided filters.
|
||||||
|
|
|
@ -272,7 +272,7 @@ func (e *StorageEngine) sortShards(objAddr interface{ EncodeToString() string })
|
||||||
h := hrw.StringHash(objAddr.EncodeToString())
|
h := hrw.StringHash(objAddr.EncodeToString())
|
||||||
shards := make([]hashedShard, 0, len(e.shards))
|
shards := make([]hashedShard, 0, len(e.shards))
|
||||||
for _, sh := range e.shards {
|
for _, sh := range e.shards {
|
||||||
shards = append(shards, hashedShard(sh))
|
shards = append(shards, sh)
|
||||||
}
|
}
|
||||||
hrw.SortHasherSliceByValue(shards, h)
|
hrw.SortHasherSliceByValue(shards, h)
|
||||||
return shards
|
return shards
|
||||||
|
@ -285,7 +285,7 @@ func (e *StorageEngine) unsortedShards() []hashedShard {
|
||||||
shards := make([]hashedShard, 0, len(e.shards))
|
shards := make([]hashedShard, 0, len(e.shards))
|
||||||
|
|
||||||
for _, sh := range e.shards {
|
for _, sh := range e.shards {
|
||||||
shards = append(shards, hashedShard(sh))
|
shards = append(shards, sh)
|
||||||
}
|
}
|
||||||
|
|
||||||
return shards
|
return shards
|
||||||
|
|
Some files were not shown because too many files have changed in this diff Show more
Loading…
Add table
Reference in a new issue