Compare commits

..

4 commits

Author SHA1 Message Date
10574f8ff0 [#1607] adm/ape: Fix parsing policy-hash flag
All checks were successful
DCO action / DCO (pull_request) Successful in 37s
Vulncheck / Vulncheck (pull_request) Successful in 59s
Build / Build Components (pull_request) Successful in 1m24s
Pre-commit hooks / Pre-commit (pull_request) Successful in 1m28s
Tests and linters / Run gofumpt (pull_request) Successful in 2m37s
Tests and linters / Tests (pull_request) Successful in 2m56s
Tests and linters / Lint (pull_request) Successful in 3m3s
Tests and linters / gopls check (pull_request) Successful in 3m50s
Tests and linters / Tests with -race (pull_request) Successful in 3m53s
Tests and linters / Staticcheck (pull_request) Successful in 6m11s
Signed-off-by: Denis Kirillov <d.kirillov@yadro.com>
2025-03-03 11:14:48 +03:00
01b19e72fa [#1607] adm/ape: Extend kind flag to accept integer
Signed-off-by: Denis Kirillov <d.kirillov@yadro.com>
2025-03-03 09:53:08 +03:00
39faefb175 [#1607] adm/ape: Adopt policy reader
Embed https://git.frostfs.info/dkirillov/policy-reader
tool to 'frostfs-adm morph ape' command

Signed-off-by: Denis Kirillov <d.kirillov@yadro.com>
2025-03-03 09:53:08 +03:00
e88646e99b [#1607] adm/ape: Update target-type description
Signed-off-by: Denis Kirillov <d.kirillov@yadro.com>
2025-03-03 09:53:08 +03:00
68 changed files with 927 additions and 1641 deletions

View file

@ -0,0 +1,96 @@
package chains
import (
"math/big"
"git.frostfs.info/TrueCloudLab/frostfs-contract/commonclient"
policycontract "git.frostfs.info/TrueCloudLab/frostfs-contract/policy"
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-adm/internal/commonflags"
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-adm/internal/modules/morph/ape/raw/output"
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-adm/internal/modules/morph/helper"
commonCmd "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/internal/common"
apeCmd "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/internal/common/ape"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
"github.com/nspcc-dev/neo-go/pkg/rpcclient/invoker"
"github.com/nspcc-dev/neo-go/pkg/rpcclient/management"
"github.com/spf13/cobra"
)
var listContainerCmd = &cobra.Command{
Use: "list-container",
Short: "List container related (namespace) policies",
Long: "List container related (namespace) policies along with filtering by service (s3/storage)",
Example: `chains list-container -r http://localhost:40332 list --container 7h7NcXcF6k6b1yidqEHc1jkyXUm1MfUDrrTuHAefhiDe
chains list-container -r http://localhost:40332 --policy-hash 81c1a41d09e08087a4b679418b12be5d3ab15742 list --container 7h7NcXcF6k6b1yidqEHc1jkyXUm1MfUDrrTuHAefhiDe --namespace test`,
RunE: runListContainerCmd,
}
const (
containerFlag = "container"
)
func initListContainerCmd() {
listContainerCmd.Flags().StringP(commonflags.EndpointFlag, commonflags.EndpointFlagShort, "", commonflags.EndpointFlagDesc)
listContainerCmd.Flags().String(apeCmd.ChainNameFlag, "", apeCmd.ChainNameFlagDesc)
listContainerCmd.Flags().String(containerFlag, "", "Container id or bucket name in nns (if name is provided than 'namespace' should be set too)")
listContainerCmd.Flags().String(namespaceFlag, "", "Namespace where container name will be looked up")
listContainerCmd.Flags().Bool(decodeChainFlag, false, "Use this flag to decode chain")
listContainerCmd.Flags().Bool(decodeIDFlag, false, "Use this flag to additionally decode chain id (without --decode-chain no take effect)")
_ = listContainerCmd.MarkFlagRequired(containerFlag)
}
func runListContainerCmd(cmd *cobra.Command, _ []string) error {
chainName := parseChainName(cmd)
namespace := parseNamespace(cmd)
inv, policyHash, _ := initReaders(cmd)
cnrID := parseContainer(cmd, inv, namespace)
printContainer(cmd, namespace, cnrID)
res, err := commonclient.ReadIteratorItems(inv, 100, policyHash, methodIteratorChainsByPrefix, big.NewInt(int64(policycontract.Namespace)), namespace, string(chainName))
commonCmd.ExitOnErr(cmd, "can't read iterator: %w", err)
decodeChain, _ := cmd.Flags().GetBool(decodeChainFlag)
decodeID, _ := cmd.Flags().GetBool(decodeIDFlag)
cmd.Printf("\nnamespace policies: %d\n", len(res))
err = output.PrintChains(cmd, res, decodeChain, decodeID)
commonCmd.ExitOnErr(cmd, "can't print chains: %w", err)
res, err = commonclient.ReadIteratorItems(inv, 100, policyHash, methodIteratorChainsByPrefix, big.NewInt(int64(policycontract.Container)), cnrID.EncodeToString(), string(chainName))
commonCmd.ExitOnErr(cmd, "can't read iterator: %w", err)
cmd.Printf("\ncontainer policies: %d\n", len(res))
return output.PrintChains(cmd, res, decodeChain, decodeID)
}
func printContainer(cmd *cobra.Command, namespace string, cnrID cid.ID) {
cmd.Println("container:")
cmd.Printf("\tnamespace: '%s'\n", namespace)
cmd.Printf("\tCID: '%s'\n", cnrID.EncodeToString())
}
func parseContainer(cmd *cobra.Command, inv *invoker.Invoker, namespace string) cid.ID {
containerName, _ := cmd.Flags().GetString(containerFlag)
var cnrID cid.ID
if err := cnrID.DecodeString(containerName); err == nil {
return cnrID
}
var domain container.Domain
domain.SetName(containerName)
if namespace != "" {
domain.SetZone(namespace + ".ns")
}
nnsCs, err := helper.GetContractByID(management.NewReader(inv), 1)
commonCmd.ExitOnErr(cmd, "can't get NNS contract state: %w", err)
cnrID, err = helper.NNSResolveContainerDomain(inv, nnsCs.Hash, domain.Name()+"."+domain.Zone())
commonCmd.ExitOnErr(cmd, "can't resolve container id: %w", err)
return cnrID
}

View file

@ -0,0 +1,203 @@
package chains
import (
"encoding/hex"
"errors"
"fmt"
"math/big"
"git.frostfs.info/TrueCloudLab/frostfs-contract/commonclient"
ffsidclient "git.frostfs.info/TrueCloudLab/frostfs-contract/frostfsid/client"
policycontract "git.frostfs.info/TrueCloudLab/frostfs-contract/policy"
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-adm/internal/commonflags"
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-adm/internal/modules/morph/ape/raw/output"
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-adm/internal/modules/morph/helper"
commonCmd "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/internal/common"
apeCmd "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/internal/common/ape"
apechain "git.frostfs.info/TrueCloudLab/policy-engine/pkg/chain"
neoflags "github.com/nspcc-dev/neo-go/cli/flags"
"github.com/nspcc-dev/neo-go/pkg/rpcclient"
"github.com/nspcc-dev/neo-go/pkg/rpcclient/invoker"
"github.com/nspcc-dev/neo-go/pkg/rpcclient/management"
"github.com/nspcc-dev/neo-go/pkg/util"
"github.com/nspcc-dev/neo-go/pkg/wallet"
"github.com/spf13/cobra"
"github.com/spf13/viper"
)
var listUserCmd = &cobra.Command{
Use: "list-user",
Short: "List user related (groups/namespace) policies",
Long: "List user related (groups/namespace) policies along with filtering by service (s3/storage)",
Example: `policy-reader list-user -r http://localhost:40332 list --user NiGqBpUdMvAC68SxUeyYwVPyBCsqzNuof
policy-reader list-user -r http://localhost:40332 --policy-hash 81c1a41d09e08087a4b679418b12be5d3ab15742 list --user NiGqBpUdMvAC68SxUeyYwVPyBCsqzNuofL --service s3`,
RunE: runListCmd,
}
var errUnknownChainNameType = errors.New("unknown chain-name")
const (
userFlag = "user"
namespaceFlag = "namespace"
decodeChainFlag = "decode-chain"
decodeIDFlag = "decode-id"
)
const methodIteratorChainsByPrefix = "iteratorChainsByPrefix"
func initListUserCmd() {
listUserCmd.Flags().StringP(commonflags.EndpointFlag, commonflags.EndpointFlagShort, "", commonflags.EndpointFlagDesc)
listUserCmd.Flags().String(apeCmd.ChainNameFlag, "", apeCmd.ChainNameFlagDesc)
listUserCmd.Flags().String(userFlag, "", "User address or name in frostfsid contract (if name is provided than 'namespace' should be set too)")
listUserCmd.Flags().String(namespaceFlag, "", "Namespace where user name will be looked up")
listUserCmd.Flags().Bool(decodeChainFlag, false, "Use this flag to decode chain")
listUserCmd.Flags().Bool(decodeIDFlag, false, "Use this flag to additionally decode chain id (without --decode-chain no take effect)")
_ = listUserCmd.MarkFlagRequired(userFlag)
}
func runListCmd(cmd *cobra.Command, _ []string) error {
chainName := parseChainName(cmd)
namespace := parseNamespace(cmd)
inv, policyHash, ffsidCli := initReaders(cmd)
user, _ := cmd.Flags().GetString(userFlag)
subj, err := resolveSubject(ffsidCli, namespace, user)
commonCmd.ExitOnErr(cmd, "can't resolve frostfsid subject: %w", err)
printSubject(cmd, subj)
res, err := commonclient.ReadIteratorItems(inv, 100, policyHash, methodIteratorChainsByPrefix, big.NewInt(int64(policycontract.Namespace)), subj.Namespace, string(chainName))
commonCmd.ExitOnErr(cmd, "can't read iterator: %w", err)
decodeChain, _ := cmd.Flags().GetBool(decodeChainFlag)
decodeID, _ := cmd.Flags().GetBool(decodeIDFlag)
cmd.Printf("\nnamespace policies: %d\n", len(res))
err = output.PrintChains(cmd, res, decodeChain, decodeID)
commonCmd.ExitOnErr(cmd, "can't print chains: %w", err)
userEntity := big.NewInt(int64(policycontract.User))
userEntityName := fmt.Sprintf("%s:%s", subj.Namespace, subj.PrimaryKey.Address())
res, err = commonclient.ReadIteratorItems(inv, 100, policyHash, methodIteratorChainsByPrefix, userEntity, userEntityName, string(chainName))
commonCmd.ExitOnErr(cmd, "can't read iterator: %w", err)
cmd.Printf("\nuser policies: %d\n", len(res))
err = output.PrintChains(cmd, res, decodeChain, decodeID)
commonCmd.ExitOnErr(cmd, "can't print chains: %w", err)
cmd.Printf("\ngroup policies: %d\n", len(subj.Groups))
groupEntity := big.NewInt(int64(policycontract.Group))
for _, group := range subj.Groups {
groupEntityName := fmt.Sprintf("%s:%d", group.Namespace, group.ID)
res, err = commonclient.ReadIteratorItems(inv, 100, policyHash, methodIteratorChainsByPrefix, groupEntity, groupEntityName, string(chainName))
commonCmd.ExitOnErr(cmd, "can't read iterator: %w", err)
cmd.Printf("user group '%s' (id: %d) policies: %d\n", group.Name, group.ID, len(res))
err = output.PrintChains(cmd, res, decodeChain, decodeID)
commonCmd.ExitOnErr(cmd, "can't print chains: %w", err)
}
return nil
}
func resolveSubject(ffsid *ffsidclient.Client, namespace, userName string) (*ffsidclient.SubjectExtended, error) {
if userHash, err := neoflags.ParseAddress(userName); err == nil {
subj, err := ffsid.GetSubject(userHash)
if err != nil {
return nil, err
}
return ffsid.GetSubjectExtended(subj.PrimaryKey.GetScriptHash())
}
subj, err := ffsid.GetSubjectByName(namespace, userName)
if err != nil {
return nil, err
}
return ffsid.GetSubjectExtended(subj.PrimaryKey.GetScriptHash())
}
func parseChainName(cmd *cobra.Command) apechain.Name {
chainName, _ := cmd.Flags().GetString(apeCmd.ChainNameFlag)
switch chainName {
case "":
return ""
case "s3":
return apechain.S3
case "ingress":
return apechain.Ingress
}
commonCmd.ExitOnErr(cmd, "can't parse chain-name: %w", errUnknownChainNameType)
panic("unreachable")
}
func parseNamespace(cmd *cobra.Command) string {
namespace, _ := cmd.Flags().GetString(namespaceFlag)
if namespace == "root" {
namespace = ""
}
return namespace
}
func printSubject(cmd *cobra.Command, subj *ffsidclient.SubjectExtended) {
cmd.Println("subject:")
cmd.Printf("\tnamespace: '%s'\n", subj.Namespace)
cmd.Printf("\tname: '%s'\n", subj.Name)
cmd.Printf("\tkey: '%s'\n", hex.EncodeToString(subj.PrimaryKey.Bytes()))
cmd.Printf("\tadditional keys:\n")
for _, key := range subj.AdditionalKeys {
cmd.Printf("\t\t%s\n", hex.EncodeToString(key.Bytes()))
}
cmd.Printf("\tclaims:\n")
for k, v := range subj.KV {
cmd.Printf("\t\t%s: '%s'\n", k, v)
}
cmd.Printf("\tgroups:\n")
for _, gr := range subj.Groups {
cmd.Printf("\t\t%d: '%s'\n", gr.ID, gr.Name)
}
}
func initReaders(cmd *cobra.Command) (*invoker.Invoker, util.Uint160, *ffsidclient.Client) {
endpoint := viper.GetString(commonflags.EndpointFlag)
rpcCli, err := rpcclient.New(cmd.Context(), endpoint, rpcclient.Options{})
commonCmd.ExitOnErr(cmd, "can't init rpc client: %w", err)
inv := invoker.New(rpcCli, nil)
nnsCs, err := helper.GetContractByID(management.NewReader(inv), 1)
commonCmd.ExitOnErr(cmd, "can't get NNS contract state: %w", err)
policyHashStr, _ := cmd.Flags().GetString(policyHashFlag)
policyHash, err := util.Uint160DecodeStringLE(policyHashStr)
if err != nil {
policyHash, err = helper.NNSResolveHash(inv, nnsCs.Hash, policyHashStr)
commonCmd.ExitOnErr(cmd, "can't resolve NNS policy contract: %w", err)
}
frostfsidHashStr, _ := cmd.Flags().GetString(frostfsidHashFlag)
frostfsidHash, err := util.Uint160DecodeStringLE(policyHashStr)
if err != nil {
frostfsidHash, err = helper.NNSResolveHash(inv, nnsCs.Hash, frostfsidHashStr)
commonCmd.ExitOnErr(cmd, "can't resolve NNS frostfsid contract: %w", err)
}
acc, err := wallet.NewAccount()
commonCmd.ExitOnErr(cmd, "can't create new account: %w", err)
ffsidCli, err := ffsidclient.New(rpcCli, acc, frostfsidHash, ffsidclient.Options{})
commonCmd.ExitOnErr(cmd, "can't init frostfsid client: %w", err)
return inv, policyHash, ffsidCli
}

View file

@ -0,0 +1,32 @@
package chains
import (
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-adm/internal/commonflags"
"github.com/spf13/cobra"
"github.com/spf13/viper"
)
var Cmd = &cobra.Command{
Use: "chains",
Short: "Chain related policy operations",
Long: "Chain related policy operations. Complex scenarios like: list all user chains (including groups, namespaces).",
PersistentPreRun: func(cmd *cobra.Command, _ []string) {
_ = viper.BindPFlag(commonflags.EndpointFlag, cmd.Flags().Lookup(commonflags.EndpointFlag))
},
}
const (
policyHashFlag = "policy-hash"
frostfsidHashFlag = "frostfsid-hash"
)
func init() {
Cmd.PersistentFlags().String(policyHashFlag, "policy.frostfs", "NNS name or script hash of policy contract")
Cmd.PersistentFlags().String(frostfsidHashFlag, "frostfsid.frostfs", "NNS name or script hash of frostfsid contract")
Cmd.AddCommand(listUserCmd)
initListUserCmd()
Cmd.AddCommand(listContainerCmd)
initListContainerCmd()
}

View file

@ -0,0 +1,74 @@
package raw
import (
"encoding/base64"
"git.frostfs.info/TrueCloudLab/frostfs-contract/commonclient"
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-adm/internal/commonflags"
commonCmd "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/internal/common"
"github.com/spf13/cobra"
"github.com/spf13/viper"
)
var listChainNamesCmd = &cobra.Command{
Use: "list-chain-names",
Short: "Invoke 'listChainNames' method",
Long: "Invoke 'listChainNames' method in policy contract and print results to stdout",
Example: `raw -r http://localhost:40332 list-chain-names --kind n --name ''
raw -r http://localhost:40332 --policy-hash 81c1a41d09e08087a4b679418b12be5d3ab15742 list-chain-names --kind c --name 7h7NcXcF6k6b1yidqEHc1jkyXUm1MfUDrrTuHAefhiDe`,
RunE: runListChainNamesCmd,
}
const (
nameFlag = "name"
nameBase64Flag = "name-base64"
)
func initListChainNamesCmd() {
listChainNamesCmd.Flags().StringP(commonflags.EndpointFlag, commonflags.EndpointFlagShort, "", commonflags.EndpointFlagDesc)
listChainNamesCmd.Flags().String(kindFlag, "n", "Target kind (1-byte) to list (n(namespace)/c(container)/g(group)/u(user)/i(iam)) or its integer representation")
listChainNamesCmd.Flags().String(nameFlag, "", "Target name to list")
listChainNamesCmd.Flags().Bool(nameBase64Flag, false, "Use this flag if you provide name in base64 format")
_ = listChainNamesCmd.MarkFlagRequired(kindFlag)
_ = listChainNamesCmd.MarkFlagRequired(nameFlag)
}
func runListChainNamesCmd(cmd *cobra.Command, _ []string) error {
kind, _ := cmd.Flags().GetString(kindFlag)
entity, err := parseTargetKind(kind)
commonCmd.ExitOnErr(cmd, "can't parse target kind: %w", err)
entityName, err := parseEntityName(cmd)
commonCmd.ExitOnErr(cmd, "can't parse name: %w", err)
inv, policyHash := initPolicyReader(cmd)
res, err := commonclient.ReadIteratorItems(inv, 100, policyHash, methodListChainNames, entity, entityName)
commonCmd.ExitOnErr(cmd, "can't read iterator: %w", err)
cmd.Printf("%s target chain names: %d\n", kind, len(res))
for _, re := range res {
bytes, err := re.TryBytes()
commonCmd.ExitOnErr(cmd, "can't parse result: %w", err)
cmd.Printf("%s\t(base64: '%s')\n", string(bytes), base64.StdEncoding.EncodeToString(bytes))
}
return nil
}
func parseEntityName(cmd *cobra.Command) ([]byte, error) {
entityNameStr, _ := cmd.Flags().GetString(nameFlag)
var entityName []byte
if viper.GetBool(nameBase64Flag) {
return base64.StdEncoding.DecodeString(entityNameStr)
}
if entityNameStr == "root" {
entityNameStr = ""
}
entityName = []byte(entityNameStr)
return entityName, nil
}

View file

@ -0,0 +1,71 @@
package raw
import (
"encoding/base64"
"git.frostfs.info/TrueCloudLab/frostfs-contract/commonclient"
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-adm/internal/commonflags"
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-adm/internal/modules/morph/ape/raw/output"
commonCmd "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/internal/common"
"github.com/spf13/cobra"
)
var listChainsByPrefixCmd = &cobra.Command{
Use: "list-chains-by-prefix",
Short: "Invoke 'iteratorChainsByPrefix' method",
Long: "Invoke 'iteratorChainsByPrefix' method in policy contract and print results to stdout",
Example: `raw -r http://localhost:40332 list-chains-by-prefix --kind n --name ''
raw -r http://localhost:40332 --policy-hash 81c1a41d09e08087a4b679418b12be5d3ab15742 list-chains-by-prefix --kind c --name 7h7NcXcF6k6b1yidqEHc1jkyXUm1MfUDrrTuHAefhiDe`,
RunE: runListChainsByPrefixCmd,
}
const (
prefixFlag = "prefix"
prefixBase64Flag = "prefix-base64"
decodeChainFlag = "decode-chain"
decodeIDFlag = "decode-id"
)
func initListChainsByPrefixCmd() {
listChainsByPrefixCmd.Flags().StringP(commonflags.EndpointFlag, commonflags.EndpointFlagShort, "", commonflags.EndpointFlagDesc)
listChainsByPrefixCmd.Flags().String(kindFlag, "n", "Target kind (1-byte) to list (n(namespace)/c(container)/g(group)/u(user)/i(iam)) or its integer representation")
listChainsByPrefixCmd.Flags().String(nameFlag, "", "Target name to list")
listChainsByPrefixCmd.Flags().String(prefixFlag, "", "Prefix to list")
listChainsByPrefixCmd.Flags().Bool(prefixBase64Flag, false, "Use this flag if you provide prefix in base64 format")
listChainsByPrefixCmd.Flags().Bool(nameBase64Flag, false, "Use this flag if you provide name in base64 format")
listChainsByPrefixCmd.Flags().Bool(decodeChainFlag, false, "Use this flag to decode chain")
listChainsByPrefixCmd.Flags().Bool(decodeIDFlag, false, "Use this flag to additionally decode chain id (without --decode-chain no take effect)")
_ = listChainsByPrefixCmd.MarkFlagRequired(kindFlag)
_ = listChainsByPrefixCmd.MarkFlagRequired(nameFlag)
}
func runListChainsByPrefixCmd(cmd *cobra.Command, _ []string) error {
kind, _ := cmd.Flags().GetString(kindFlag)
entity, err := parseTargetKind(kind)
commonCmd.ExitOnErr(cmd, "can't parse target kind: %w", err)
entityName, err := parseEntityName(cmd)
commonCmd.ExitOnErr(cmd, "can't parse name: %w", err)
prefixStr, _ := cmd.Flags().GetString(prefixFlag)
prefixBase64, _ := cmd.Flags().GetBool(prefixBase64Flag)
var prefix []byte
if prefixBase64 {
if prefix, err = base64.StdEncoding.DecodeString(prefixStr); err != nil {
return err
}
} else {
prefix = []byte(prefixStr)
}
inv, policyHash := initPolicyReader(cmd)
res, err := commonclient.ReadIteratorItems(inv, 100, policyHash, methodIteratorChainsByPrefix, entity, entityName, prefix)
commonCmd.ExitOnErr(cmd, "can't read iterator: %w", err)
cmd.Printf("%s target chains names: %d\n", kind, len(res))
decodeChain, _ := cmd.Flags().GetBool(decodeChainFlag)
decodeID, _ := cmd.Flags().GetBool(decodeIDFlag)
return output.PrintChains(cmd, res, decodeChain, decodeID)
}

View file

@ -0,0 +1,100 @@
package raw
import (
"encoding/base64"
"fmt"
"math/big"
"strconv"
"git.frostfs.info/TrueCloudLab/frostfs-contract/commonclient"
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-adm/internal/commonflags"
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-adm/internal/modules/morph/helper"
commonCmd "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/internal/common"
"github.com/nspcc-dev/neo-go/pkg/rpcclient"
"github.com/nspcc-dev/neo-go/pkg/rpcclient/invoker"
"github.com/nspcc-dev/neo-go/pkg/rpcclient/management"
"github.com/nspcc-dev/neo-go/pkg/util"
"github.com/spf13/cobra"
"github.com/spf13/viper"
)
var listTargetsCmd = &cobra.Command{
Use: "list-targets",
Short: "Invoke 'listTargets' method",
Long: "Invoke 'listTargets' method in policy contract and print results to stdout",
Example: `raw -r http://localhost:40332 list-targets
raw -r http://localhost:40332 --policy-hash 81c1a41d09e08087a4b679418b12be5d3ab15742 list-targets --kind c
raw -r http://localhost:40332 --policy-hash 81c1a41d09e08087a4b679418b12be5d3ab15742 list-targets --kind 99`,
RunE: runListTargetsCmd,
}
const (
kindFlag = "kind"
)
const (
methodIteratorChainsByPrefix = "iteratorChainsByPrefix"
methodListTargets = "listTargets"
methodListChainNames = "listChainNames"
)
func initListTargetsCmd() {
listTargetsCmd.Flags().StringP(commonflags.EndpointFlag, commonflags.EndpointFlagShort, "", commonflags.EndpointFlagDesc)
listTargetsCmd.Flags().String(kindFlag, "n", "Target kind (1-byte) to list (n(namespace)/c(container)/g(group)/u(user)/i(iam)) or its integer representation")
}
func runListTargetsCmd(cmd *cobra.Command, _ []string) error {
kind, _ := cmd.Flags().GetString(kindFlag)
entity, err := parseTargetKind(kind)
commonCmd.ExitOnErr(cmd, "can't parse target kind: %w", err)
inv, policyHash := initPolicyReader(cmd)
res, err := commonclient.ReadIteratorItems(inv, 100, policyHash, methodListTargets, entity)
commonCmd.ExitOnErr(cmd, "can't read iterator: %w", err)
cmd.Printf("%s targets: %d\n", kind, len(res))
for _, re := range res {
bytes, err := re.TryBytes()
commonCmd.ExitOnErr(cmd, "can't parse result: %w", err)
cmd.Printf("%s\t(base64: '%s')\n", string(bytes), base64.StdEncoding.EncodeToString(bytes))
}
return nil
}
func parseTargetKind(typ string) (*big.Int, error) {
val, err := strconv.ParseInt(typ, 10, 64)
if err == nil {
return big.NewInt(val), nil
}
if len(typ) != 1 {
return nil, fmt.Errorf("invalid type: %s", typ)
}
return big.NewInt(int64(typ[0])), nil
}
func initPolicyReader(cmd *cobra.Command) (*invoker.Invoker, util.Uint160) {
endpoint := viper.GetString(commonflags.EndpointFlag)
rpcCli, err := rpcclient.New(cmd.Context(), endpoint, rpcclient.Options{})
commonCmd.ExitOnErr(cmd, "can't init rpc client: %w", err)
inv := invoker.New(rpcCli, nil)
policyHashStr, _ := cmd.Flags().GetString(policyHashFlag)
if policyHash, err := util.Uint160DecodeStringLE(policyHashStr); err == nil {
return inv, policyHash
}
nnsCs, err := helper.GetContractByID(management.NewReader(inv), 1)
commonCmd.ExitOnErr(cmd, "can't get NNS contract state: %w", err)
policyHash, err := helper.NNSResolveHash(inv, nnsCs.Hash, policyHashStr)
commonCmd.ExitOnErr(cmd, "can't resolve NNS policy contract: %w", err)
return inv, policyHash
}

View file

@ -0,0 +1,56 @@
package output
import (
"encoding/base64"
apechain "git.frostfs.info/TrueCloudLab/policy-engine/pkg/chain"
"github.com/nspcc-dev/neo-go/pkg/vm/stackitem"
"github.com/spf13/cobra"
)
const (
minPrintable = 32
maxPrintable = 127
)
func PrintChains(cmd *cobra.Command, list []stackitem.Item, decodeChain, decodeID bool) error {
for _, item := range list {
bytes, err := item.TryBytes()
if err != nil {
return err
}
if !decodeChain {
cmd.Printf("\t%s\n", string(bytes))
continue
}
var chain apechain.Chain
if err = chain.DecodeBytes(bytes); err != nil {
cmd.PrintErrf("invalid chain format: %s\n", base64.StdEncoding.EncodeToString(bytes))
continue
}
raw, err := chain.MarshalJSON()
if err != nil {
return err
}
if decodeID {
var printableID string
for _, r := range string(chain.ID) {
if minPrintable <= r && r <= maxPrintable {
printableID += string(r)
} else {
printableID += "."
}
}
cmd.Printf("\t%s - %s\n", printableID, string(raw))
} else {
cmd.Printf("\t%s\n", string(raw))
}
}
return nil
}

View file

@ -0,0 +1,32 @@
package raw
import (
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-adm/internal/commonflags"
"github.com/spf13/cobra"
"github.com/spf13/viper"
)
var Cmd = &cobra.Command{
Use: "raw",
Short: "FrostFS policy contract raw reader",
PersistentPreRun: func(cmd *cobra.Command, _ []string) {
_ = viper.BindPFlag(commonflags.EndpointFlag, cmd.Flags().Lookup(commonflags.EndpointFlag))
},
}
const (
policyHashFlag = "policy-hash"
)
func init() {
Cmd.PersistentFlags().String(policyHashFlag, "policy.frostfs", "NNS name or script hash of policy contract")
Cmd.AddCommand(listTargetsCmd)
initListTargetsCmd()
Cmd.AddCommand(listChainNamesCmd)
initListChainNamesCmd()
Cmd.AddCommand(listChainsByPrefixCmd)
initListChainsByPrefixCmd()
}

View file

@ -1,6 +1,10 @@
package ape
import "github.com/spf13/cobra"
import (
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-adm/internal/modules/morph/ape/chains"
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-adm/internal/modules/morph/ape/raw"
"github.com/spf13/cobra"
)
var Cmd = &cobra.Command{
Use: "ape",
@ -8,6 +12,9 @@ var Cmd = &cobra.Command{
}
func init() {
Cmd.AddCommand(raw.Cmd)
Cmd.AddCommand(chains.Cmd)
initAddRuleChainCmd()
initRemoveRuleChainCmd()
initListRuleChainsCmd()

View file

@ -10,6 +10,7 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-adm/internal/modules/config"
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-adm/internal/modules/morph/constants"
commonCmd "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/internal/common"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
"github.com/nspcc-dev/neo-go/pkg/core/native/nativenames"
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
"github.com/nspcc-dev/neo-go/pkg/encoding/address"
@ -158,6 +159,40 @@ func NNSResolveHash(inv *invoker.Invoker, nnsHash util.Uint160, domain string) (
return ParseNNSResolveResult(item)
}
// NNSResolveContainerDomain returns errMissingNNSRecord if invocation fault exception contains "token not found".
func NNSResolveContainerDomain(inv *invoker.Invoker, nnsHash util.Uint160, domain string) (cid.ID, error) {
item, err := NNSResolve(inv, nnsHash, domain)
if err != nil {
return cid.ID{}, err
}
return parseNNSResolveResultCID(item)
}
func parseNNSResolveResultCID(res stackitem.Item) (cid.ID, error) {
arr, ok := res.Value().([]stackitem.Item)
if !ok {
arr = []stackitem.Item{res}
}
if _, ok := res.Value().(stackitem.Null); ok || len(arr) == 0 {
return cid.ID{}, errors.New("NNS record is missing")
}
var cnrID cid.ID
for i := range arr {
bs, err := arr[i].TryBytes()
if err != nil {
continue
}
if err = cnrID.DecodeString(string(bs)); err == nil {
return cnrID, nil
}
}
return cid.ID{}, errors.New("no valid CIDs are found")
}
func DomainOf(contract string) string {
return contract + ".frostfs"
}

View file

@ -28,7 +28,7 @@ const (
RPC = "rpc-endpoint"
RPCShorthand = "r"
RPCDefault = ""
RPCUsage = "Remote node address ('<host>:<port>' or 'grpcs://<host>:<port>')"
RPCUsage = "Remote node address (as 'multiaddr' or '<host>:<port>')"
Timeout = "timeout"
TimeoutShorthand = "t"

View file

@ -2,17 +2,13 @@ package meta
import (
"context"
"encoding/binary"
"errors"
"fmt"
common "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-lens/internal"
schemaCommon "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-lens/internal/schema/common"
schema "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-lens/internal/schema/metabase"
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-lens/internal/tui"
"github.com/rivo/tview"
"github.com/spf13/cobra"
"go.etcd.io/bbolt"
)
var tuiCMD = &cobra.Command{
@ -31,11 +27,6 @@ Available search filters:
var initialPrompt string
var parserPerSchemaVersion = map[uint64]schemaCommon.Parser{
2: schema.MetabaseParserV2,
3: schema.MetabaseParserV3,
}
func init() {
common.AddComponentPathFlag(tuiCMD, &vPath)
@ -58,22 +49,12 @@ func runTUI(cmd *cobra.Command) error {
}
defer db.Close()
schemaVersion, hasVersion := lookupSchemaVersion(cmd, db)
if !hasVersion {
return errors.New("couldn't detect schema version")
}
metabaseParser, ok := parserPerSchemaVersion[schemaVersion]
if !ok {
return fmt.Errorf("unknown schema version %d", schemaVersion)
}
// Need if app was stopped with Ctrl-C.
ctx, cancel := context.WithCancel(cmd.Context())
defer cancel()
app := tview.NewApplication()
ui := tui.NewUI(ctx, app, db, metabaseParser, nil)
ui := tui.NewUI(ctx, app, db, schema.MetabaseParser, nil)
_ = ui.AddFilter("cid", tui.CIDParser, "CID")
_ = ui.AddFilter("oid", tui.OIDParser, "OID")
@ -88,31 +69,3 @@ func runTUI(cmd *cobra.Command) error {
app.SetRoot(ui, true).SetFocus(ui)
return app.Run()
}
var (
shardInfoBucket = []byte{5}
versionRecord = []byte("version")
)
func lookupSchemaVersion(cmd *cobra.Command, db *bbolt.DB) (version uint64, ok bool) {
err := db.View(func(tx *bbolt.Tx) error {
bkt := tx.Bucket(shardInfoBucket)
if bkt == nil {
return nil
}
rec := bkt.Get(versionRecord)
if rec == nil {
return nil
}
version = binary.LittleEndian.Uint64(rec)
ok = true
return nil
})
if err != nil {
common.ExitOnErr(cmd, fmt.Errorf("couldn't lookup version: %w", err))
}
return
}

View file

@ -80,15 +80,10 @@ var (
},
)
UserAttributeParserV2 = NewUserAttributeKeyBucketParser(
UserAttributeParser = NewUserAttributeKeyBucketParser(
NewUserAttributeValueBucketParser(records.UserAttributeRecordParser),
)
UserAttributeParserV3 = NewUserAttributeKeyBucketParserWithSpecificKeys(
NewUserAttributeValueBucketParser(records.UserAttributeRecordParser),
[]string{"FilePath", "S3-Access-Box-CRDT-Name"},
)
PayloadHashParser = NewPrefixContainerBucketParser(PayloadHash, records.PayloadHashRecordParser, Resolvers{
cidResolver: StrictResolver,
oidResolver: StrictResolver,
@ -113,14 +108,4 @@ var (
cidResolver: StrictResolver,
oidResolver: LenientResolver,
})
ExpirationEpochToObjectParser = NewPrefixBucketParser(ExpirationEpochToObject, records.ExpirationEpochToObjectRecordParser, Resolvers{
cidResolver: LenientResolver,
oidResolver: LenientResolver,
})
ObjectToExpirationEpochParser = NewPrefixContainerBucketParser(ObjectToExpirationEpoch, records.ObjectToExpirationEpochRecordParser, Resolvers{
cidResolver: StrictResolver,
oidResolver: LenientResolver,
})
)

View file

@ -22,8 +22,6 @@ const (
Split
ContainerCounters
ECInfo
ExpirationEpochToObject
ObjectToExpirationEpoch
)
var x = map[Prefix]string{
@ -45,8 +43,6 @@ var x = map[Prefix]string{
Split: "Split",
ContainerCounters: "Container Counters",
ECInfo: "EC Info",
ExpirationEpochToObject: "Exp. Epoch to Object",
ObjectToExpirationEpoch: "Object to Exp. Epoch",
}
func (p Prefix) String() string {

View file

@ -9,7 +9,7 @@ import (
func (b *PrefixBucket) String() string {
return common.FormatSimple(
fmt.Sprintf("(%2d %-20s)", b.prefix, b.prefix), tcell.ColorLime,
fmt.Sprintf("(%2d %-18s)", b.prefix, b.prefix), tcell.ColorLime,
)
}
@ -17,7 +17,7 @@ func (b *PrefixContainerBucket) String() string {
return fmt.Sprintf(
"%s CID %s",
common.FormatSimple(
fmt.Sprintf("(%2d %-20s)", b.prefix, b.prefix), tcell.ColorLime,
fmt.Sprintf("(%2d %-18s)", b.prefix, b.prefix), tcell.ColorLime,
),
common.FormatSimple(b.id.String(), tcell.ColorAqua),
)
@ -34,7 +34,7 @@ func (b *ContainerBucket) String() string {
func (b *UserAttributeKeyBucket) String() string {
return fmt.Sprintf("%s CID %s ATTR-KEY %s",
common.FormatSimple(
fmt.Sprintf("(%2d %-20s)", b.prefix, b.prefix), tcell.ColorLime,
fmt.Sprintf("(%2d %-18s)", b.prefix, b.prefix), tcell.ColorLime,
),
common.FormatSimple(
fmt.Sprintf("%-44s", b.id), tcell.ColorAqua,

View file

@ -2,7 +2,6 @@ package buckets
import (
"errors"
"slices"
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-lens/internal/schema/common"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
@ -62,7 +61,6 @@ var (
ErrInvalidKeyLength = errors.New("invalid key length")
ErrInvalidValueLength = errors.New("invalid value length")
ErrInvalidPrefix = errors.New("invalid prefix")
ErrUnexpectedAttributeKey = errors.New("unexpected attribute key")
)
func NewPrefixBucketParser(prefix Prefix, next common.Parser, resolvers Resolvers) common.Parser {
@ -134,10 +132,6 @@ func NewContainerBucketParser(next common.Parser, resolvers Resolvers) common.Pa
}
func NewUserAttributeKeyBucketParser(next common.Parser) common.Parser {
return NewUserAttributeKeyBucketParserWithSpecificKeys(next, nil)
}
func NewUserAttributeKeyBucketParserWithSpecificKeys(next common.Parser, keys []string) common.Parser {
return func(key, value []byte) (common.SchemaEntry, common.Parser, error) {
if value != nil {
return nil, nil, ErrNotBucket
@ -153,11 +147,6 @@ func NewUserAttributeKeyBucketParserWithSpecificKeys(next common.Parser, keys []
return nil, nil, err
}
b.key = string(key[33:])
if len(keys) != 0 && !slices.Contains(keys, b.key) {
return nil, nil, ErrUnexpectedAttributeKey
}
return &b, next, nil
}
}

View file

@ -5,30 +5,7 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-lens/internal/schema/metabase/buckets"
)
var MetabaseParserV3 = common.WithFallback(
common.Any(
buckets.GraveyardParser,
buckets.GarbageParser,
buckets.ContainerVolumeParser,
buckets.LockedParser,
buckets.ShardInfoParser,
buckets.PrimaryParser,
buckets.LockersParser,
buckets.TombstoneParser,
buckets.SmallParser,
buckets.RootParser,
buckets.UserAttributeParserV3,
buckets.ParentParser,
buckets.SplitParser,
buckets.ContainerCountersParser,
buckets.ECInfoParser,
buckets.ExpirationEpochToObjectParser,
buckets.ObjectToExpirationEpochParser,
),
common.RawParser.ToFallbackParser(),
)
var MetabaseParserV2 = common.WithFallback(
var MetabaseParser = common.WithFallback(
common.Any(
buckets.GraveyardParser,
buckets.GarbageParser,
@ -41,7 +18,7 @@ var MetabaseParserV2 = common.WithFallback(
buckets.SmallParser,
buckets.RootParser,
buckets.OwnerParser,
buckets.UserAttributeParserV2,
buckets.UserAttributeParser,
buckets.PayloadHashParser,
buckets.ParentParser,
buckets.SplitParser,

View file

@ -63,11 +63,3 @@ func (r *ContainerCountersRecord) DetailedString() string {
func (r *ECInfoRecord) DetailedString() string {
return spew.Sdump(*r)
}
func (r *ExpirationEpochToObjectRecord) DetailedString() string {
return spew.Sdump(*r)
}
func (r *ObjectToExpirationEpochRecord) DetailedString() string {
return spew.Sdump(*r)
}

View file

@ -143,26 +143,3 @@ func (r *ECInfoRecord) Filter(typ string, val any) common.FilterResult {
return common.No
}
}
func (r *ExpirationEpochToObjectRecord) Filter(typ string, val any) common.FilterResult {
switch typ {
case "cid":
id := val.(cid.ID)
return common.IfThenElse(r.cnt.Equals(id), common.Yes, common.No)
case "oid":
id := val.(oid.ID)
return common.IfThenElse(r.obj.Equals(id), common.Yes, common.No)
default:
return common.No
}
}
func (r *ObjectToExpirationEpochRecord) Filter(typ string, val any) common.FilterResult {
switch typ {
case "oid":
id := val.(oid.ID)
return common.IfThenElse(r.obj.Equals(id), common.Yes, common.No)
default:
return common.No
}
}

View file

@ -249,45 +249,3 @@ func ECInfoRecordParser(key, value []byte) (common.SchemaEntry, common.Parser, e
}
return &r, nil, nil
}
func ExpirationEpochToObjectRecordParser(key, _ []byte) (common.SchemaEntry, common.Parser, error) {
if len(key) != 72 {
return nil, nil, ErrInvalidKeyLength
}
var (
r ExpirationEpochToObjectRecord
err error
)
r.epoch = binary.BigEndian.Uint64(key[:8])
if err = r.cnt.Decode(key[8:40]); err != nil {
return nil, nil, err
}
if err = r.obj.Decode(key[40:]); err != nil {
return nil, nil, err
}
return &r, nil, nil
}
func ObjectToExpirationEpochRecordParser(key, value []byte) (common.SchemaEntry, common.Parser, error) {
if len(key) != 32 {
return nil, nil, ErrInvalidKeyLength
}
if len(value) != 8 {
return nil, nil, ErrInvalidValueLength
}
var (
r ObjectToExpirationEpochRecord
err error
)
if err = r.obj.Decode(key); err != nil {
return nil, nil, err
}
r.epoch = binary.LittleEndian.Uint64(value)
return &r, nil, nil
}

View file

@ -2,7 +2,6 @@ package records
import (
"fmt"
"strconv"
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-lens/internal/schema/common"
"github.com/gdamore/tcell/v2"
@ -134,22 +133,3 @@ func (r *ECInfoRecord) String() string {
len(r.ids),
)
}
func (r *ExpirationEpochToObjectRecord) String() string {
return fmt.Sprintf(
"exp. epoch %s %c CID %s OID %s",
common.FormatSimple(fmt.Sprintf("%-20d", r.epoch), tcell.ColorAqua),
tview.Borders.Vertical,
common.FormatSimple(fmt.Sprintf("%-44s", r.cnt), tcell.ColorAqua),
common.FormatSimple(fmt.Sprintf("%-44s", r.obj), tcell.ColorAqua),
)
}
func (r *ObjectToExpirationEpochRecord) String() string {
return fmt.Sprintf(
"OID %s %c exp. epoch %s",
common.FormatSimple(fmt.Sprintf("%-44s", r.obj), tcell.ColorAqua),
tview.Borders.Vertical,
common.FormatSimple(strconv.FormatUint(r.epoch, 10), tcell.ColorAqua),
)
}

View file

@ -79,15 +79,4 @@ type (
id oid.ID
ids []oid.ID
}
ExpirationEpochToObjectRecord struct {
epoch uint64
cnt cid.ID
obj oid.ID
}
ObjectToExpirationEpochRecord struct {
obj oid.ID
epoch uint64
}
)

View file

@ -33,7 +33,6 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/metrics"
internalNet "git.frostfs.info/TrueCloudLab/frostfs-node/internal/net"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/qos"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/ape/chainbase"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
frostfsidcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/frostfsid"
@ -136,7 +135,6 @@ type shardCfg struct {
refillMetabase bool
refillMetabaseWorkersCount int
mode shardmode.Mode
limiter qos.Limiter
metaCfg struct {
path string
@ -256,42 +254,39 @@ func (a *applicationConfiguration) readConfig(c *config.Config) error {
return engineconfig.IterateShards(c, false, func(sc *shardconfig.Config) error { return a.updateShardConfig(c, sc) })
}
func (a *applicationConfiguration) updateShardConfig(c *config.Config, source *shardconfig.Config) error {
var target shardCfg
func (a *applicationConfiguration) updateShardConfig(c *config.Config, oldConfig *shardconfig.Config) error {
var newConfig shardCfg
target.refillMetabase = source.RefillMetabase()
target.refillMetabaseWorkersCount = source.RefillMetabaseWorkersCount()
target.mode = source.Mode()
target.compress = source.Compress()
target.estimateCompressibility = source.EstimateCompressibility()
target.estimateCompressibilityThreshold = source.EstimateCompressibilityThreshold()
target.uncompressableContentType = source.UncompressableContentTypes()
target.smallSizeObjectLimit = source.SmallSizeLimit()
newConfig.refillMetabase = oldConfig.RefillMetabase()
newConfig.refillMetabaseWorkersCount = oldConfig.RefillMetabaseWorkersCount()
newConfig.mode = oldConfig.Mode()
newConfig.compress = oldConfig.Compress()
newConfig.estimateCompressibility = oldConfig.EstimateCompressibility()
newConfig.estimateCompressibilityThreshold = oldConfig.EstimateCompressibilityThreshold()
newConfig.uncompressableContentType = oldConfig.UncompressableContentTypes()
newConfig.smallSizeObjectLimit = oldConfig.SmallSizeLimit()
a.setShardWriteCacheConfig(&target, source)
a.setShardWriteCacheConfig(&newConfig, oldConfig)
a.setShardPiloramaConfig(c, &target, source)
a.setShardPiloramaConfig(c, &newConfig, oldConfig)
if err := a.setShardStorageConfig(&target, source); err != nil {
if err := a.setShardStorageConfig(&newConfig, oldConfig); err != nil {
return err
}
a.setMetabaseConfig(&target, source)
a.setMetabaseConfig(&newConfig, oldConfig)
a.setGCConfig(&target, source)
if err := a.setLimiter(&target, source); err != nil {
return err
}
a.setGCConfig(&newConfig, oldConfig)
a.EngineCfg.shards = append(a.EngineCfg.shards, target)
a.EngineCfg.shards = append(a.EngineCfg.shards, newConfig)
return nil
}
func (a *applicationConfiguration) setShardWriteCacheConfig(target *shardCfg, source *shardconfig.Config) {
writeCacheCfg := source.WriteCache()
func (a *applicationConfiguration) setShardWriteCacheConfig(newConfig *shardCfg, oldConfig *shardconfig.Config) {
writeCacheCfg := oldConfig.WriteCache()
if writeCacheCfg.Enabled() {
wc := &target.writecacheCfg
wc := &newConfig.writecacheCfg
wc.enabled = true
wc.path = writeCacheCfg.Path()
@ -304,10 +299,10 @@ func (a *applicationConfiguration) setShardWriteCacheConfig(target *shardCfg, so
}
}
func (a *applicationConfiguration) setShardPiloramaConfig(c *config.Config, target *shardCfg, source *shardconfig.Config) {
func (a *applicationConfiguration) setShardPiloramaConfig(c *config.Config, newConfig *shardCfg, oldConfig *shardconfig.Config) {
if config.BoolSafe(c.Sub("tree"), "enabled") {
piloramaCfg := source.Pilorama()
pr := &target.piloramaCfg
piloramaCfg := oldConfig.Pilorama()
pr := &newConfig.piloramaCfg
pr.enabled = true
pr.path = piloramaCfg.Path()
@ -318,8 +313,8 @@ func (a *applicationConfiguration) setShardPiloramaConfig(c *config.Config, targ
}
}
func (a *applicationConfiguration) setShardStorageConfig(target *shardCfg, source *shardconfig.Config) error {
blobStorCfg := source.BlobStor()
func (a *applicationConfiguration) setShardStorageConfig(newConfig *shardCfg, oldConfig *shardconfig.Config) error {
blobStorCfg := oldConfig.BlobStor()
storagesCfg := blobStorCfg.Storages()
ss := make([]subStorageCfg, 0, len(storagesCfg))
@ -353,13 +348,13 @@ func (a *applicationConfiguration) setShardStorageConfig(target *shardCfg, sourc
ss = append(ss, sCfg)
}
target.subStorages = ss
newConfig.subStorages = ss
return nil
}
func (a *applicationConfiguration) setMetabaseConfig(target *shardCfg, source *shardconfig.Config) {
metabaseCfg := source.Metabase()
m := &target.metaCfg
func (a *applicationConfiguration) setMetabaseConfig(newConfig *shardCfg, oldConfig *shardconfig.Config) {
metabaseCfg := oldConfig.Metabase()
m := &newConfig.metaCfg
m.path = metabaseCfg.Path()
m.perm = metabaseCfg.BoltDB().Perm()
@ -367,25 +362,12 @@ func (a *applicationConfiguration) setMetabaseConfig(target *shardCfg, source *s
m.maxBatchSize = metabaseCfg.BoltDB().MaxBatchSize()
}
func (a *applicationConfiguration) setGCConfig(target *shardCfg, source *shardconfig.Config) {
gcCfg := source.GC()
target.gcCfg.removerBatchSize = gcCfg.RemoverBatchSize()
target.gcCfg.removerSleepInterval = gcCfg.RemoverSleepInterval()
target.gcCfg.expiredCollectorBatchSize = gcCfg.ExpiredCollectorBatchSize()
target.gcCfg.expiredCollectorWorkerCount = gcCfg.ExpiredCollectorWorkerCount()
}
func (a *applicationConfiguration) setLimiter(target *shardCfg, source *shardconfig.Config) error {
limitsConfig := source.Limits()
limiter, err := qos.NewLimiter(limitsConfig)
if err != nil {
return err
}
if target.limiter != nil {
target.limiter.Close()
}
target.limiter = limiter
return nil
func (a *applicationConfiguration) setGCConfig(newConfig *shardCfg, oldConfig *shardconfig.Config) {
gcCfg := oldConfig.GC()
newConfig.gcCfg.removerBatchSize = gcCfg.RemoverBatchSize()
newConfig.gcCfg.removerSleepInterval = gcCfg.RemoverSleepInterval()
newConfig.gcCfg.expiredCollectorBatchSize = gcCfg.ExpiredCollectorBatchSize()
newConfig.gcCfg.expiredCollectorWorkerCount = gcCfg.ExpiredCollectorWorkerCount()
}
// internals contains application-specific internals that are created
@ -1072,7 +1054,6 @@ func (c *cfg) getShardOpts(ctx context.Context, shCfg shardCfg) shardOptsWithID
return pool
}),
shard.WithLimiter(shCfg.limiter),
}
return sh
}

View file

@ -11,7 +11,6 @@ import (
blobovniczaconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine/shard/blobstor/blobovnicza"
fstreeconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine/shard/blobstor/fstree"
gcconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine/shard/gc"
limitsconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine/shard/limits"
piloramaconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine/shard/pilorama"
writecacheconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine/shard/writecache"
configtest "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/test"
@ -77,7 +76,6 @@ func TestEngineSection(t *testing.T) {
ss := blob.Storages()
pl := sc.Pilorama()
gc := sc.GC()
limits := sc.Limits()
switch num {
case 0:
@ -136,75 +134,6 @@ func TestEngineSection(t *testing.T) {
require.Equal(t, false, sc.RefillMetabase())
require.Equal(t, mode.ReadOnly, sc.Mode())
require.Equal(t, 100, sc.RefillMetabaseWorkersCount())
readLimits := limits.Read()
writeLimits := limits.Write()
require.Equal(t, 30*time.Second, readLimits.IdleTimeout)
require.Equal(t, int64(10_000), readLimits.MaxRunningOps)
require.Equal(t, int64(1_000), readLimits.MaxWaitingOps)
require.Equal(t, 45*time.Second, writeLimits.IdleTimeout)
require.Equal(t, int64(1_000), writeLimits.MaxRunningOps)
require.Equal(t, int64(100), writeLimits.MaxWaitingOps)
require.ElementsMatch(t, readLimits.Tags,
[]limitsconfig.IOTagConfig{
{
Tag: "internal",
Weight: toPtr(20),
ReservedOps: toPtr(1000),
LimitOps: toPtr(0),
},
{
Tag: "client",
Weight: toPtr(70),
ReservedOps: toPtr(10000),
},
{
Tag: "background",
Weight: toPtr(5),
LimitOps: toPtr(10000),
ReservedOps: toPtr(0),
},
{
Tag: "writecache",
Weight: toPtr(5),
LimitOps: toPtr(25000),
},
{
Tag: "policer",
Weight: toPtr(5),
LimitOps: toPtr(25000),
},
})
require.ElementsMatch(t, writeLimits.Tags,
[]limitsconfig.IOTagConfig{
{
Tag: "internal",
Weight: toPtr(200),
ReservedOps: toPtr(100),
LimitOps: toPtr(0),
},
{
Tag: "client",
Weight: toPtr(700),
ReservedOps: toPtr(1000),
},
{
Tag: "background",
Weight: toPtr(50),
LimitOps: toPtr(1000),
ReservedOps: toPtr(0),
},
{
Tag: "writecache",
Weight: toPtr(50),
LimitOps: toPtr(2500),
},
{
Tag: "policer",
Weight: toPtr(50),
LimitOps: toPtr(2500),
},
})
case 1:
require.Equal(t, "tmp/1/blob/pilorama.db", pl.Path())
require.Equal(t, fs.FileMode(0o644), pl.Perm())
@ -259,17 +188,6 @@ func TestEngineSection(t *testing.T) {
require.Equal(t, true, sc.RefillMetabase())
require.Equal(t, mode.ReadWrite, sc.Mode())
require.Equal(t, shardconfig.RefillMetabaseWorkersCountDefault, sc.RefillMetabaseWorkersCount())
readLimits := limits.Read()
writeLimits := limits.Write()
require.Equal(t, limitsconfig.DefaultIdleTimeout, readLimits.IdleTimeout)
require.Equal(t, limitsconfig.NoLimit, readLimits.MaxRunningOps)
require.Equal(t, limitsconfig.NoLimit, readLimits.MaxWaitingOps)
require.Equal(t, limitsconfig.DefaultIdleTimeout, writeLimits.IdleTimeout)
require.Equal(t, limitsconfig.NoLimit, writeLimits.MaxRunningOps)
require.Equal(t, limitsconfig.NoLimit, writeLimits.MaxWaitingOps)
require.Equal(t, 0, len(readLimits.Tags))
require.Equal(t, 0, len(writeLimits.Tags))
}
return nil
})
@ -283,7 +201,3 @@ func TestEngineSection(t *testing.T) {
configtest.ForEnvFileType(t, path, fileConfigTest)
})
}
func toPtr(v float64) *float64 {
return &v
}

View file

@ -4,7 +4,6 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config"
blobstorconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine/shard/blobstor"
gcconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine/shard/gc"
limitsconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine/shard/limits"
metabaseconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine/shard/metabase"
piloramaconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine/shard/pilorama"
writecacheconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine/shard/writecache"
@ -126,14 +125,6 @@ func (x *Config) GC() *gcconfig.Config {
)
}
// Limits returns "limits" subsection as a limitsconfig.Config.
func (x *Config) Limits() *limitsconfig.Config {
return limitsconfig.From(
(*config.Config)(x).
Sub("limits"),
)
}
// RefillMetabase returns the value of "resync_metabase" config parameter.
//
// Returns false if the value is not a valid bool.

View file

@ -1,130 +0,0 @@
package limits
import (
"math"
"strconv"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config"
"github.com/spf13/cast"
)
const (
NoLimit int64 = math.MaxInt64
DefaultIdleTimeout = 5 * time.Minute
)
// From wraps config section into Config.
func From(c *config.Config) *Config {
return (*Config)(c)
}
// Config is a wrapper over the config section
// which provides access to Shard's limits configurations.
type Config config.Config
// Read returns the value of "read" limits config section.
func (x *Config) Read() OpConfig {
return x.parse("read")
}
// Write returns the value of "write" limits config section.
func (x *Config) Write() OpConfig {
return x.parse("write")
}
func (x *Config) parse(sub string) OpConfig {
c := (*config.Config)(x).Sub(sub)
var result OpConfig
if s := config.Int(c, "max_waiting_ops"); s > 0 {
result.MaxWaitingOps = s
} else {
result.MaxWaitingOps = NoLimit
}
if s := config.Int(c, "max_running_ops"); s > 0 {
result.MaxRunningOps = s
} else {
result.MaxRunningOps = NoLimit
}
if s := config.DurationSafe(c, "idle_timeout"); s > 0 {
result.IdleTimeout = s
} else {
result.IdleTimeout = DefaultIdleTimeout
}
result.Tags = tags(c)
return result
}
type OpConfig struct {
// MaxWaitingOps returns the value of "max_waiting_ops" config parameter.
//
// Equals NoLimit if the value is not a positive number.
MaxWaitingOps int64
// MaxRunningOps returns the value of "max_running_ops" config parameter.
//
// Equals NoLimit if the value is not a positive number.
MaxRunningOps int64
// IdleTimeout returns the value of "idle_timeout" config parameter.
//
// Equals DefaultIdleTimeout if the value is not a valid duration.
IdleTimeout time.Duration
// Tags returns the value of "tags" config parameter.
//
// Equals nil if the value is not a valid tags config slice.
Tags []IOTagConfig
}
type IOTagConfig struct {
Tag string
Weight *float64
LimitOps *float64
ReservedOps *float64
}
func tags(c *config.Config) []IOTagConfig {
c = c.Sub("tags")
var result []IOTagConfig
for i := 0; ; i++ {
tag := config.String(c, strconv.Itoa(i)+".tag")
if tag == "" {
return result
}
var tagConfig IOTagConfig
tagConfig.Tag = tag
v := c.Value(strconv.Itoa(i) + ".weight")
if v != nil {
w, err := cast.ToFloat64E(v)
panicOnErr(err)
tagConfig.Weight = &w
}
v = c.Value(strconv.Itoa(i) + ".limit_ops")
if v != nil {
l, err := cast.ToFloat64E(v)
panicOnErr(err)
tagConfig.LimitOps = &l
}
v = c.Value(strconv.Itoa(i) + ".reserved_ops")
if v != nil {
r, err := cast.ToFloat64E(v)
panicOnErr(err)
tagConfig.ReservedOps = &r
}
result = append(result, tagConfig)
}
}
func panicOnErr(err error) {
if err != nil {
panic(err)
}
}

View file

@ -7,7 +7,7 @@ const (
TargetNameFlag = "target-name"
TargetNameFlagDesc = "Resource name in APE resource name format"
TargetTypeFlag = "target-type"
TargetTypeFlagDesc = "Resource type(container/namespace)"
TargetTypeFlagDesc = "Resource type(container/namespace/group/user)"
ChainIDFlag = "chain-id"
ChainIDFlagDesc = "Chain id"
ChainIDHexFlag = "chain-id-hex"

View file

@ -157,47 +157,6 @@ FROSTFS_STORAGE_SHARD_0_GC_REMOVER_SLEEP_INTERVAL=2m
FROSTFS_STORAGE_SHARD_0_GC_EXPIRED_COLLECTOR_BATCH_SIZE=1500
#### Limit of concurrent workers collecting expired objects by the garbage collector
FROSTFS_STORAGE_SHARD_0_GC_EXPIRED_COLLECTOR_WORKER_COUNT=15
#### Limits config
FROSTFS_STORAGE_SHARD_0_LIMITS_READ_MAX_RUNNING_OPS=10000
FROSTFS_STORAGE_SHARD_0_LIMITS_READ_MAX_WAITING_OPS=1000
FROSTFS_STORAGE_SHARD_0_LIMITS_WRITE_MAX_RUNNING_OPS=1000
FROSTFS_STORAGE_SHARD_0_LIMITS_WRITE_MAX_WAITING_OPS=100
FROSTFS_STORAGE_SHARD_0_LIMITS_WRITE_IDLE_TIMEOUT=45s
FROSTFS_STORAGE_SHARD_0_LIMITS_READ_IDLE_TIMEOUT=30s
FROSTFS_STORAGE_SHARD_0_LIMITS_READ_TAGS_0_TAG=internal
FROSTFS_STORAGE_SHARD_0_LIMITS_READ_TAGS_0_WEIGHT=20
FROSTFS_STORAGE_SHARD_0_LIMITS_READ_TAGS_0_LIMIT_OPS=0
FROSTFS_STORAGE_SHARD_0_LIMITS_READ_TAGS_0_RESERVED_OPS=1000
FROSTFS_STORAGE_SHARD_0_LIMITS_READ_TAGS_1_TAG=client
FROSTFS_STORAGE_SHARD_0_LIMITS_READ_TAGS_1_WEIGHT=70
FROSTFS_STORAGE_SHARD_0_LIMITS_READ_TAGS_1_RESERVED_OPS=10000
FROSTFS_STORAGE_SHARD_0_LIMITS_READ_TAGS_2_TAG=background
FROSTFS_STORAGE_SHARD_0_LIMITS_READ_TAGS_2_WEIGHT=5
FROSTFS_STORAGE_SHARD_0_LIMITS_READ_TAGS_2_LIMIT_OPS=10000
FROSTFS_STORAGE_SHARD_0_LIMITS_READ_TAGS_2_RESERVED_OPS=0
FROSTFS_STORAGE_SHARD_0_LIMITS_READ_TAGS_3_TAG=writecache
FROSTFS_STORAGE_SHARD_0_LIMITS_READ_TAGS_3_WEIGHT=5
FROSTFS_STORAGE_SHARD_0_LIMITS_READ_TAGS_3_LIMIT_OPS=25000
FROSTFS_STORAGE_SHARD_0_LIMITS_READ_TAGS_4_TAG=policer
FROSTFS_STORAGE_SHARD_0_LIMITS_READ_TAGS_4_WEIGHT=5
FROSTFS_STORAGE_SHARD_0_LIMITS_READ_TAGS_4_LIMIT_OPS=25000
FROSTFS_STORAGE_SHARD_0_LIMITS_WRITE_TAGS_0_TAG=internal
FROSTFS_STORAGE_SHARD_0_LIMITS_WRITE_TAGS_0_WEIGHT=200
FROSTFS_STORAGE_SHARD_0_LIMITS_WRITE_TAGS_0_LIMIT_OPS=0
FROSTFS_STORAGE_SHARD_0_LIMITS_WRITE_TAGS_0_RESERVED_OPS=100
FROSTFS_STORAGE_SHARD_0_LIMITS_WRITE_TAGS_1_TAG=client
FROSTFS_STORAGE_SHARD_0_LIMITS_WRITE_TAGS_1_WEIGHT=700
FROSTFS_STORAGE_SHARD_0_LIMITS_WRITE_TAGS_1_RESERVED_OPS=1000
FROSTFS_STORAGE_SHARD_0_LIMITS_WRITE_TAGS_2_TAG=background
FROSTFS_STORAGE_SHARD_0_LIMITS_WRITE_TAGS_2_WEIGHT=50
FROSTFS_STORAGE_SHARD_0_LIMITS_WRITE_TAGS_2_LIMIT_OPS=1000
FROSTFS_STORAGE_SHARD_0_LIMITS_WRITE_TAGS_2_RESERVED_OPS=0
FROSTFS_STORAGE_SHARD_0_LIMITS_WRITE_TAGS_3_TAG=writecache
FROSTFS_STORAGE_SHARD_0_LIMITS_WRITE_TAGS_3_WEIGHT=50
FROSTFS_STORAGE_SHARD_0_LIMITS_WRITE_TAGS_3_LIMIT_OPS=2500
FROSTFS_STORAGE_SHARD_0_LIMITS_WRITE_TAGS_4_TAG=policer
FROSTFS_STORAGE_SHARD_0_LIMITS_WRITE_TAGS_4_WEIGHT=50
FROSTFS_STORAGE_SHARD_0_LIMITS_WRITE_TAGS_4_LIMIT_OPS=2500
## 1 shard
### Flag to refill Metabase from BlobStor

View file

@ -221,76 +221,6 @@
"remover_sleep_interval": "2m",
"expired_collector_batch_size": 1500,
"expired_collector_worker_count": 15
},
"limits": {
"read": {
"max_running_ops": 10000,
"max_waiting_ops": 1000,
"idle_timeout": "30s",
"tags": [
{
"tag": "internal",
"weight": 20,
"limit_ops": 0,
"reserved_ops": 1000
},
{
"tag": "client",
"weight": 70,
"reserved_ops": 10000
},
{
"tag": "background",
"weight": 5,
"limit_ops": 10000,
"reserved_ops": 0
},
{
"tag": "writecache",
"weight": 5,
"limit_ops": 25000
},
{
"tag": "policer",
"weight": 5,
"limit_ops": 25000
}
]
},
"write": {
"max_running_ops": 1000,
"max_waiting_ops": 100,
"idle_timeout": "45s",
"tags": [
{
"tag": "internal",
"weight": 200,
"limit_ops": 0,
"reserved_ops": 100
},
{
"tag": "client",
"weight": 700,
"reserved_ops": 1000
},
{
"tag": "background",
"weight": 50,
"limit_ops": 1000,
"reserved_ops": 0
},
{
"tag": "writecache",
"weight": 50,
"limit_ops": 2500
},
{
"tag": "policer",
"weight": 50,
"limit_ops": 2500
}
]
}
}
},
"1": {

View file

@ -227,52 +227,6 @@ storage:
expired_collector_batch_size: 1500 # number of objects to be marked expired by the garbage collector
expired_collector_worker_count: 15 # number of concurrent workers collecting expired objects by the garbage collector
limits:
read:
max_running_ops: 10000
max_waiting_ops: 1000
idle_timeout: 30s
tags:
- tag: internal
weight: 20
limit_ops: 0
reserved_ops: 1000
- tag: client
weight: 70
reserved_ops: 10000
- tag: background
weight: 5
limit_ops: 10000
reserved_ops: 0
- tag: writecache
weight: 5
limit_ops: 25000
- tag: policer
weight: 5
limit_ops: 25000
write:
max_running_ops: 1000
max_waiting_ops: 100
idle_timeout: 45s
tags:
- tag: internal
weight: 200
limit_ops: 0
reserved_ops: 100
- tag: client
weight: 700
reserved_ops: 1000
- tag: background
weight: 50
limit_ops: 1000
reserved_ops: 0
- tag: writecache
weight: 50
limit_ops: 2500
- tag: policer
weight: 50
limit_ops: 2500
1:
writecache:
path: tmp/1/cache # write-cache root directory

View file

@ -195,7 +195,6 @@ The following table describes configuration for each shard.
| `blobstor` | [Blobstor config](#blobstor-subsection) | | Blobstor configuration. |
| `small_object_size` | `size` | `1M` | Maximum size of an object stored in blobovnicza tree. |
| `gc` | [GC config](#gc-subsection) | | GC configuration. |
| `limits` | [Shard limits config](#limits-subsection) | | Shard limits configuration. |
### `blobstor` subsection
@ -302,64 +301,6 @@ writecache:
| `flush_worker_count` | `int` | `20` | Amount of background workers that move data from the writecache to the blobstor. |
| `max_flushing_objects_size` | `size` | `512M` | Max total size of background flushing objects. |
### `limits` subsection
```yaml
limits:
max_read_running_ops: 10000
max_read_waiting_ops: 1000
max_write_running_ops: 1000
max_write_waiting_ops: 100
read:
- tag: internal
weight: 20
limit_ops: 0
reserved_ops: 1000
- tag: client
weight: 70
reserved_ops: 10000
- tag: background
weight: 5
limit_ops: 10000
reserved_ops: 0
- tag: writecache
weight: 5
limit_ops: 25000
- tag: policer
weight: 5
limit_ops: 25000
write:
- tag: internal
weight: 200
limit_ops: 0
reserved_ops: 100
- tag: client
weight: 700
reserved_ops: 1000
- tag: background
weight: 50
limit_ops: 1000
reserved_ops: 0
- tag: writecache
weight: 50
limit_ops: 2500
- tag: policer
weight: 50
limit_ops: 2500
```
| Parameter | Type | Default value | Description |
| ----------------------- | -------- | -------------- | --------------------------------------------------------------------------------------------------------------- |
| `max_read_running_ops` | `int` | 0 (no limit) | The maximum number of runnig read operations. |
| `max_read_waiting_ops` | `int` | 0 (no limit) | The maximum number of waiting read operations. |
| `max_write_running_ops` | `int` | 0 (no limit) | The maximum number of running write operations. |
| `max_write_waiting_ops` | `int` | 0 (no limit) | The maximum number of running write operations. |
| `read` | `[]tag` | empty | Array of shard read settings for tags. |
| `write` | `[]tag` | empty | Array of shard write settings for tags. |
| `tag.tag` | `string` | empty | Tag name. Allowed values: `client`, `internal`, `background`, `writecache`, `policer`. |
| `tag.weight` | `float` | 0 (no weight) | Weight for queries with the specified tag. Weights must be specified for all tags or not specified for any one. |
| `tag.limit_ops` | `float` | 0 (no limit) | Operations per second rate limit for queries with the specified tag. |
| `tag.reserved_ops` | `float` | 0 (no reserve) | Reserved operations per second rate for queries with the specified tag. |
# `node` section

2
go.mod
View file

@ -8,7 +8,7 @@ require (
git.frostfs.info/TrueCloudLab/frostfs-crypto v0.6.0
git.frostfs.info/TrueCloudLab/frostfs-locode-db v0.4.1-0.20240710074952-65761deb5c0d
git.frostfs.info/TrueCloudLab/frostfs-observability v0.0.0-20250212111929-d34e1329c824
git.frostfs.info/TrueCloudLab/frostfs-qos v0.0.0-20250227072915-25102d1e1aa3
git.frostfs.info/TrueCloudLab/frostfs-qos v0.0.0-20250213125059-356851eed3bf
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20250217152255-c3f7378887a4
git.frostfs.info/TrueCloudLab/hrw v1.2.1
git.frostfs.info/TrueCloudLab/multinet v0.0.0-20241015075604-6cb0d80e0972

4
go.sum
View file

@ -8,8 +8,8 @@ git.frostfs.info/TrueCloudLab/frostfs-locode-db v0.4.1-0.20240710074952-65761deb
git.frostfs.info/TrueCloudLab/frostfs-locode-db v0.4.1-0.20240710074952-65761deb5c0d/go.mod h1:7ZZq8iguY7qFsXajdHGmZd2AW4QbucyrJwhbsRfOfek=
git.frostfs.info/TrueCloudLab/frostfs-observability v0.0.0-20250212111929-d34e1329c824 h1:Mxw1c/8t96vFIUOffl28lFaHKi413oCBfLMGJmF9cFA=
git.frostfs.info/TrueCloudLab/frostfs-observability v0.0.0-20250212111929-d34e1329c824/go.mod h1:kbwB4v2o6RyOfCo9kEFeUDZIX3LKhmS0yXPrtvzkQ1g=
git.frostfs.info/TrueCloudLab/frostfs-qos v0.0.0-20250227072915-25102d1e1aa3 h1:QnAt5b2R6+hQthMOIn5ECfLAlVD8IAE5JRm1NCCOmuE=
git.frostfs.info/TrueCloudLab/frostfs-qos v0.0.0-20250227072915-25102d1e1aa3/go.mod h1:PCijYq4oa8vKtIEcUX6jRiszI6XAW+nBwU+T1kB4d1U=
git.frostfs.info/TrueCloudLab/frostfs-qos v0.0.0-20250213125059-356851eed3bf h1:ik2aMBpTJJpoZe2ffcGShXRkrvny65NEPLVt67KmH/A=
git.frostfs.info/TrueCloudLab/frostfs-qos v0.0.0-20250213125059-356851eed3bf/go.mod h1:PCijYq4oa8vKtIEcUX6jRiszI6XAW+nBwU+T1kB4d1U=
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20250217152255-c3f7378887a4 h1:dOZHuOywvH1ms8U38lDCWpysgkCCeJ02RLI7zDhPcyw=
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20250217152255-c3f7378887a4/go.mod h1:aQpPWfG8oyfJ2X+FenPTJpSRWZjwcP5/RAtkW+/VEX8=
git.frostfs.info/TrueCloudLab/hrw v1.2.1 h1:ccBRK21rFvY5R1WotI6LNoPlizk7qSvdfD8lNIRudVc=

View file

@ -1,9 +0,0 @@
package assert
import "strings"
func True(cond bool, details ...string) {
if !cond {
panic(strings.Join(details, " "))
}
}

View file

@ -1,146 +0,0 @@
package qos
import (
"context"
"errors"
"fmt"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine/shard/limits"
"git.frostfs.info/TrueCloudLab/frostfs-qos/scheduling"
"git.frostfs.info/TrueCloudLab/frostfs-qos/tagging"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
)
const (
defaultIdleTimeout time.Duration = 0
defaultShare float64 = 1.0
)
type ReleaseFunc scheduling.ReleaseFunc
type Limiter interface {
ReadRequest(context.Context) (ReleaseFunc, error)
WriteRequest(context.Context) (ReleaseFunc, error)
Close()
}
type scheduler interface {
RequestArrival(ctx context.Context, tag string) (scheduling.ReleaseFunc, error)
Close()
}
func NewLimiter(c *limits.Config) (Limiter, error) {
if err := validateConfig(c); err != nil {
return nil, err
}
read, write := c.Read(), c.Write()
if isNoop(read, write) {
return noopLimiterInstance, nil
}
readScheduler, err := createScheduler(c.Read())
if err != nil {
return nil, fmt.Errorf("create read scheduler: %w", err)
}
writeScheduler, err := createScheduler(c.Write())
if err != nil {
return nil, fmt.Errorf("create write scheduler: %w", err)
}
return &mClockLimiter{
readScheduler: readScheduler,
writeScheduler: writeScheduler,
}, nil
}
func createScheduler(config limits.OpConfig) (scheduler, error) {
if len(config.Tags) == 0 && config.MaxWaitingOps == limits.NoLimit {
return newSemaphoreScheduler(config.MaxRunningOps), nil
}
return scheduling.NewMClock(
uint64(config.MaxRunningOps), uint64(config.MaxWaitingOps),
converToSchedulingTags(config.Tags), config.IdleTimeout)
}
func converToSchedulingTags(limits []limits.IOTagConfig) map[string]scheduling.TagInfo {
result := make(map[string]scheduling.TagInfo)
for _, tag := range []IOTag{IOTagClient, IOTagBackground, IOTagInternal, IOTagPolicer, IOTagWritecache} {
result[tag.String()] = scheduling.TagInfo{
Share: defaultShare,
}
}
for _, l := range limits {
v := result[l.Tag]
if l.Weight != nil && *l.Weight != 0 {
v.Share = *l.Weight
}
if l.LimitOps != nil && *l.LimitOps != 0 {
v.LimitIOPS = l.LimitOps
}
if l.ReservedOps != nil && *l.ReservedOps != 0 {
v.ReservedIOPS = l.ReservedOps
}
result[l.Tag] = v
}
return result
}
var (
_ Limiter = (*noopLimiter)(nil)
releaseStub ReleaseFunc = func() {}
noopLimiterInstance = &noopLimiter{}
)
func NewNoopLimiter() Limiter {
return &noopLimiter{}
}
type noopLimiter struct{}
func (n *noopLimiter) ReadRequest(context.Context) (ReleaseFunc, error) {
return releaseStub, nil
}
func (n *noopLimiter) WriteRequest(context.Context) (ReleaseFunc, error) {
return releaseStub, nil
}
func (n *noopLimiter) Close() {}
var _ Limiter = (*mClockLimiter)(nil)
type mClockLimiter struct {
readScheduler scheduler
writeScheduler scheduler
}
func (n *mClockLimiter) ReadRequest(ctx context.Context) (ReleaseFunc, error) {
return requestArrival(ctx, n.readScheduler)
}
func (n *mClockLimiter) WriteRequest(ctx context.Context) (ReleaseFunc, error) {
return requestArrival(ctx, n.writeScheduler)
}
func requestArrival(ctx context.Context, s scheduler) (ReleaseFunc, error) {
tag, ok := tagging.IOTagFromContext(ctx)
if !ok {
tag = IOTagClient.String()
}
if tag == IOTagCritical.String() {
return releaseStub, nil
}
rel, err := s.RequestArrival(ctx, tag)
if err != nil {
if errors.Is(err, scheduling.ErrMClockSchedulerRequestLimitExceeded) ||
errors.Is(err, errSemaphoreLimitExceeded) {
return nil, &apistatus.ResourceExhausted{}
}
return nil, err
}
return ReleaseFunc(rel), nil
}
func (n *mClockLimiter) Close() {
n.readScheduler.Close()
n.writeScheduler.Close()
}

View file

@ -1,39 +0,0 @@
package qos
import (
"context"
"errors"
qosSemaphore "git.frostfs.info/TrueCloudLab/frostfs-qos/limiting/semaphore"
"git.frostfs.info/TrueCloudLab/frostfs-qos/scheduling"
)
var (
_ scheduler = (*semaphore)(nil)
errSemaphoreLimitExceeded = errors.New("semaphore limit exceeded")
)
type semaphore struct {
s *qosSemaphore.Semaphore
}
func newSemaphoreScheduler(size int64) *semaphore {
return &semaphore{
s: qosSemaphore.NewSemaphore(size),
}
}
func (s *semaphore) Close() {}
func (s *semaphore) RequestArrival(ctx context.Context, _ string) (scheduling.ReleaseFunc, error) {
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
}
if s.s.Acquire() {
return s.s.Release, nil
}
return nil, errSemaphoreLimitExceeded
}

View file

@ -1,101 +0,0 @@
package qos
import (
"errors"
"fmt"
"math"
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine/shard/limits"
)
var errWeightsMustBeSpecified = errors.New("invalid weights: weights must be specified for all tags or not specified for any")
type tagConfig struct {
Shares, Limit, Reserved *float64
}
func validateConfig(c *limits.Config) error {
if err := validateOpConfig(c.Read()); err != nil {
return fmt.Errorf("limits 'read' section validation error: %w", err)
}
if err := validateOpConfig(c.Write()); err != nil {
return fmt.Errorf("limits 'write' section validation error: %w", err)
}
return nil
}
func validateOpConfig(c limits.OpConfig) error {
if c.MaxRunningOps <= 0 {
return fmt.Errorf("invalid 'max_running_ops = %d': must be greater than zero", c.MaxRunningOps)
}
if c.MaxWaitingOps <= 0 {
return fmt.Errorf("invalid 'max_waiting_ops = %d': must be greater than zero", c.MaxWaitingOps)
}
if c.IdleTimeout <= 0 {
return fmt.Errorf("invalid 'idle_timeout = %s': must be greater than zero", c.IdleTimeout.String())
}
if err := validateTags(c.Tags); err != nil {
return fmt.Errorf("'tags' config section validation error: %w", err)
}
return nil
}
func validateTags(configTags []limits.IOTagConfig) error {
tags := map[IOTag]tagConfig{
IOTagClient: {},
IOTagInternal: {},
IOTagBackground: {},
IOTagWritecache: {},
IOTagPolicer: {},
}
for _, t := range configTags {
tag, err := FromRawString(t.Tag)
if err != nil {
return fmt.Errorf("invalid tag %s: %w", t.Tag, err)
}
if _, ok := tags[tag]; !ok {
return fmt.Errorf("tag %s is not configurable", t.Tag)
}
tags[tag] = tagConfig{
Shares: t.Weight,
Limit: t.LimitOps,
Reserved: t.ReservedOps,
}
}
idx := 0
var shares float64
for t, v := range tags {
if idx == 0 {
idx++
shares = float64Value(v.Shares)
} else if (shares != 0 && float64Value(v.Shares) == 0) || (shares == 0 && float64Value(v.Shares) != 0) {
return errWeightsMustBeSpecified
}
if float64Value(v.Shares) < 0 || math.IsNaN(float64Value(v.Shares)) {
return fmt.Errorf("invalid weight for tag %s: must be positive value", t.String())
}
if float64Value(v.Limit) < 0 || math.IsNaN(float64Value(v.Limit)) {
return fmt.Errorf("invalid limit_ops for tag %s: must be positive value", t.String())
}
if float64Value(v.Reserved) < 0 || math.IsNaN(float64Value(v.Reserved)) {
return fmt.Errorf("invalid reserved_ops for tag %s: must be positive value", t.String())
}
}
return nil
}
func float64Value(f *float64) float64 {
if f == nil {
return 0.0
}
return *f
}
func isNoop(read, write limits.OpConfig) bool {
return read.MaxRunningOps == limits.NoLimit &&
read.MaxWaitingOps == limits.NoLimit &&
write.MaxRunningOps == limits.NoLimit &&
write.MaxWaitingOps == limits.NoLimit &&
len(read.Tags) == 0 &&
len(write.Tags) == 0
}

View file

@ -50,7 +50,7 @@ func (b *Blobovniczas) Rebuild(ctx context.Context, prm common.RebuildPrm) (comm
var res common.RebuildRes
b.log.Debug(ctx, logs.BlobovniczaTreeCompletingPreviousRebuild)
completedPreviosMoves, err := b.completeIncompletedMove(ctx, prm.MetaStorage, prm.Limiter)
completedPreviosMoves, err := b.completeIncompletedMove(ctx, prm.MetaStorage)
res.ObjectsMoved += completedPreviosMoves
if err != nil {
b.log.Warn(ctx, logs.BlobovniczaTreeCompletedPreviousRebuildFailed, zap.Error(err))
@ -79,7 +79,7 @@ func (b *Blobovniczas) migrateDBs(ctx context.Context, dbs []string, prm common.
var completedDBCount uint32
for _, db := range dbs {
b.log.Debug(ctx, logs.BlobovniczaTreeRebuildingBlobovnicza, zap.String("path", db))
movedObjects, err := b.rebuildDB(ctx, db, prm.MetaStorage, prm.Limiter)
movedObjects, err := b.rebuildDB(ctx, db, prm.MetaStorage, prm.WorkerLimiter)
res.ObjectsMoved += movedObjects
if err != nil {
b.log.Warn(ctx, logs.BlobovniczaTreeRebuildingBlobovniczaFailed, zap.String("path", db), zap.Uint64("moved_objects_count", movedObjects), zap.Error(err))
@ -195,7 +195,7 @@ func (b *Blobovniczas) rebuildBySize(ctx context.Context, path string, targetFil
return fp < targetFillPercent || fp > 100+(100-targetFillPercent), nil
}
func (b *Blobovniczas) rebuildDB(ctx context.Context, path string, meta common.MetaStorage, concLimiter common.RebuildLimiter) (uint64, error) {
func (b *Blobovniczas) rebuildDB(ctx context.Context, path string, meta common.MetaStorage, limiter common.ConcurrentWorkersLimiter) (uint64, error) {
shDB := b.getBlobovnicza(ctx, path)
blz, err := shDB.Open(ctx)
if err != nil {
@ -212,7 +212,7 @@ func (b *Blobovniczas) rebuildDB(ctx context.Context, path string, meta common.M
if err != nil {
return 0, err
}
migratedObjects, err := b.moveObjects(ctx, blz, shDB.SystemPath(), meta, concLimiter)
migratedObjects, err := b.moveObjects(ctx, blz, shDB.SystemPath(), meta, limiter)
if err != nil {
return migratedObjects, err
}
@ -238,7 +238,7 @@ func (b *Blobovniczas) addRebuildTempFile(ctx context.Context, path string) (fun
}, nil
}
func (b *Blobovniczas) moveObjects(ctx context.Context, blz *blobovnicza.Blobovnicza, blzPath string, meta common.MetaStorage, limiter common.RebuildLimiter) (uint64, error) {
func (b *Blobovniczas) moveObjects(ctx context.Context, blz *blobovnicza.Blobovnicza, blzPath string, meta common.MetaStorage, limiter common.ConcurrentWorkersLimiter) (uint64, error) {
var result atomic.Uint64
batch := make(map[oid.Address][]byte)
@ -253,12 +253,7 @@ func (b *Blobovniczas) moveObjects(ctx context.Context, blz *blobovnicza.Blobovn
})
for {
release, err := limiter.ReadRequest(ctx)
if err != nil {
return result.Load(), err
}
_, err = blz.Iterate(ctx, prm)
release()
_, err := blz.Iterate(ctx, prm)
if err != nil && !errors.Is(err, errBatchFull) {
return result.Load(), err
}
@ -270,19 +265,13 @@ func (b *Blobovniczas) moveObjects(ctx context.Context, blz *blobovnicza.Blobovn
eg, egCtx := errgroup.WithContext(ctx)
for addr, data := range batch {
release, err := limiter.AcquireWorkSlot(egCtx)
if err != nil {
if err := limiter.AcquireWorkSlot(egCtx); err != nil {
_ = eg.Wait()
return result.Load(), err
}
eg.Go(func() error {
defer release()
moveRelease, err := limiter.WriteRequest(ctx)
if err != nil {
return err
}
err = b.moveObject(egCtx, blz, blzPath, addr, data, meta)
moveRelease()
defer limiter.ReleaseWorkSlot()
err := b.moveObject(egCtx, blz, blzPath, addr, data, meta)
if err == nil {
result.Add(1)
}
@ -370,7 +359,7 @@ func (b *Blobovniczas) dropDirectoryIfEmpty(path string) error {
return b.dropDirectoryIfEmpty(filepath.Dir(path))
}
func (b *Blobovniczas) completeIncompletedMove(ctx context.Context, metaStore common.MetaStorage, rateLimiter common.RateLimiter) (uint64, error) {
func (b *Blobovniczas) completeIncompletedMove(ctx context.Context, metaStore common.MetaStorage) (uint64, error) {
var count uint64
var rebuildTempFilesToRemove []string
err := b.iterateIncompletedRebuildDBPaths(ctx, func(s string) (bool, error) {
@ -383,24 +372,13 @@ func (b *Blobovniczas) completeIncompletedMove(ctx context.Context, metaStore co
}
defer shDB.Close(ctx)
release, err := rateLimiter.ReadRequest(ctx)
if err != nil {
return false, err
}
incompletedMoves, err := blz.ListMoveInfo(ctx)
release()
if err != nil {
return true, err
}
for _, move := range incompletedMoves {
release, err := rateLimiter.WriteRequest(ctx)
if err != nil {
return false, err
}
err = b.performMove(ctx, blz, shDB.SystemPath(), move, metaStore)
release()
if err != nil {
if err := b.performMove(ctx, blz, shDB.SystemPath(), move, metaStore); err != nil {
return true, err
}
count++
@ -410,14 +388,9 @@ func (b *Blobovniczas) completeIncompletedMove(ctx context.Context, metaStore co
return false, nil
})
for _, tmp := range rebuildTempFilesToRemove {
release, err := rateLimiter.WriteRequest(ctx)
if err != nil {
return count, err
}
if err := os.Remove(filepath.Join(b.rootPath, tmp)); err != nil {
b.log.Warn(ctx, logs.BlobovniczatreeFailedToRemoveRebuildTempFile, zap.Error(err))
}
release()
}
return count, err
}

View file

@ -161,10 +161,9 @@ func testRebuildFailoverValidate(t *testing.T, dir string, obj *objectSDK.Object
storageIDs: make(map[oid.Address][]byte),
guard: &sync.Mutex{},
}
limiter := &rebuildLimiterStub{}
rRes, err := b.Rebuild(context.Background(), common.RebuildPrm{
MetaStorage: metaStub,
Limiter: limiter,
WorkerLimiter: &rebuildLimiterStub{},
FillPercent: 1,
})
require.NoError(t, err)
@ -172,7 +171,6 @@ func testRebuildFailoverValidate(t *testing.T, dir string, obj *objectSDK.Object
require.Equal(t, uint64(0), rRes.FilesRemoved)
require.NoError(t, b.Close(context.Background()))
require.NoError(t, limiter.ValidateReleased())
blz := blobovnicza.New(blobovnicza.WithPath(filepath.Join(dir, "0", "0", "1.db")))
require.NoError(t, blz.Open(context.Background()))

View file

@ -2,9 +2,7 @@ package blobovniczatree
import (
"context"
"fmt"
"sync"
"sync/atomic"
"testing"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
@ -78,10 +76,9 @@ func TestBlobovniczaTreeFillPercentRebuild(t *testing.T) {
storageIDs: storageIDs,
guard: &sync.Mutex{},
}
limiter := &rebuildLimiterStub{}
rRes, err := b.Rebuild(context.Background(), common.RebuildPrm{
MetaStorage: metaStub,
Limiter: limiter,
WorkerLimiter: &rebuildLimiterStub{},
FillPercent: 60,
})
require.NoError(t, err)
@ -97,7 +94,6 @@ func TestBlobovniczaTreeFillPercentRebuild(t *testing.T) {
}
require.NoError(t, b.Close(context.Background()))
require.NoError(t, limiter.ValidateReleased())
})
t.Run("no rebuild single db", func(t *testing.T) {
@ -132,10 +128,9 @@ func TestBlobovniczaTreeFillPercentRebuild(t *testing.T) {
storageIDs: storageIDs,
guard: &sync.Mutex{},
}
limiter := &rebuildLimiterStub{}
rRes, err := b.Rebuild(context.Background(), common.RebuildPrm{
MetaStorage: metaStub,
Limiter: limiter,
WorkerLimiter: &rebuildLimiterStub{},
FillPercent: 90, // 64KB / 100KB = 64%
})
require.NoError(t, err)
@ -151,7 +146,6 @@ func TestBlobovniczaTreeFillPercentRebuild(t *testing.T) {
}
require.NoError(t, b.Close(context.Background()))
require.NoError(t, limiter.ValidateReleased())
})
t.Run("rebuild by fill percent", func(t *testing.T) {
@ -199,10 +193,9 @@ func TestBlobovniczaTreeFillPercentRebuild(t *testing.T) {
storageIDs: storageIDs,
guard: &sync.Mutex{},
}
limiter := &rebuildLimiterStub{}
rRes, err := b.Rebuild(context.Background(), common.RebuildPrm{
MetaStorage: metaStub,
Limiter: limiter,
WorkerLimiter: &rebuildLimiterStub{},
FillPercent: 80,
})
require.NoError(t, err)
@ -222,7 +215,6 @@ func TestBlobovniczaTreeFillPercentRebuild(t *testing.T) {
}
require.NoError(t, b.Close(context.Background()))
require.NoError(t, limiter.ValidateReleased())
})
t.Run("rebuild by overflow", func(t *testing.T) {
@ -274,10 +266,9 @@ func TestBlobovniczaTreeFillPercentRebuild(t *testing.T) {
require.NoError(t, b.Open(mode.ComponentReadWrite))
require.NoError(t, b.Init())
limiter := &rebuildLimiterStub{}
rRes, err := b.Rebuild(context.Background(), common.RebuildPrm{
MetaStorage: metaStub,
Limiter: limiter,
WorkerLimiter: &rebuildLimiterStub{},
FillPercent: 80,
})
require.NoError(t, err)
@ -294,7 +285,6 @@ func TestBlobovniczaTreeFillPercentRebuild(t *testing.T) {
}
require.NoError(t, b.Close(context.Background()))
require.NoError(t, limiter.ValidateReleased())
})
}
@ -348,10 +338,9 @@ func TestBlobovniczaTreeRebuildLargeObject(t *testing.T) {
storageIDs: storageIDs,
guard: &sync.Mutex{},
}
limiter := &rebuildLimiterStub{}
var rPrm common.RebuildPrm
rPrm.MetaStorage = metaStub
rPrm.Limiter = limiter
rPrm.WorkerLimiter = &rebuildLimiterStub{}
rPrm.FillPercent = 1
rRes, err := b.Rebuild(context.Background(), rPrm)
require.NoError(t, err)
@ -367,7 +356,6 @@ func TestBlobovniczaTreeRebuildLargeObject(t *testing.T) {
}
require.NoError(t, b.Close(context.Background()))
require.NoError(t, limiter.ValidateReleased())
}
func testBlobovniczaTreeRebuildHelper(t *testing.T, sourceDepth, sourceWidth, targetDepth, targetWidth uint64, shouldMigrate bool) {
@ -439,10 +427,9 @@ func testBlobovniczaTreeRebuildHelper(t *testing.T, sourceDepth, sourceWidth, ta
storageIDs: storageIDs,
guard: &sync.Mutex{},
}
limiter := &rebuildLimiterStub{}
var rPrm common.RebuildPrm
rPrm.MetaStorage = metaStub
rPrm.Limiter = limiter
rPrm.WorkerLimiter = &rebuildLimiterStub{}
rPrm.FillPercent = 1
rRes, err := b.Rebuild(context.Background(), rPrm)
require.NoError(t, err)
@ -458,7 +445,6 @@ func testBlobovniczaTreeRebuildHelper(t *testing.T, sourceDepth, sourceWidth, ta
}
require.NoError(t, b.Close(context.Background()))
require.NoError(t, limiter.ValidateReleased())
}
type storageIDUpdateStub struct {
@ -476,36 +462,7 @@ func (s *storageIDUpdateStub) UpdateStorageID(ctx context.Context, addr oid.Addr
return nil
}
type rebuildLimiterStub struct {
slots atomic.Int64
readRequests atomic.Int64
writeRequests atomic.Int64
}
type rebuildLimiterStub struct{}
func (s *rebuildLimiterStub) AcquireWorkSlot(context.Context) (common.ReleaseFunc, error) {
s.slots.Add(1)
return func() { s.slots.Add(-1) }, nil
}
func (s *rebuildLimiterStub) ReadRequest(context.Context) (common.ReleaseFunc, error) {
s.readRequests.Add(1)
return func() { s.readRequests.Add(-1) }, nil
}
func (s *rebuildLimiterStub) WriteRequest(context.Context) (common.ReleaseFunc, error) {
s.writeRequests.Add(1)
return func() { s.writeRequests.Add(-1) }, nil
}
func (s *rebuildLimiterStub) ValidateReleased() error {
if v := s.slots.Load(); v != 0 {
return fmt.Errorf("invalid slots value %d", v)
}
if v := s.readRequests.Load(); v != 0 {
return fmt.Errorf("invalid read requests value %d", v)
}
if v := s.writeRequests.Load(); v != 0 {
return fmt.Errorf("invalid write requests value %d", v)
}
return nil
}
func (s *rebuildLimiterStub) AcquireWorkSlot(context.Context) error { return nil }
func (s *rebuildLimiterStub) ReleaseWorkSlot() {}

View file

@ -13,7 +13,7 @@ type RebuildRes struct {
type RebuildPrm struct {
MetaStorage MetaStorage
Limiter RebuildLimiter
WorkerLimiter ConcurrentWorkersLimiter
FillPercent int
}
@ -21,18 +21,7 @@ type MetaStorage interface {
UpdateStorageID(ctx context.Context, addr oid.Address, storageID []byte) error
}
type ReleaseFunc func()
type ConcurrencyLimiter interface {
AcquireWorkSlot(ctx context.Context) (ReleaseFunc, error)
}
type RateLimiter interface {
ReadRequest(context.Context) (ReleaseFunc, error)
WriteRequest(context.Context) (ReleaseFunc, error)
}
type RebuildLimiter interface {
ConcurrencyLimiter
RateLimiter
type ConcurrentWorkersLimiter interface {
AcquireWorkSlot(ctx context.Context) error
ReleaseWorkSlot()
}

View file

@ -13,13 +13,18 @@ type StorageIDUpdate interface {
UpdateStorageID(ctx context.Context, addr oid.Address, storageID []byte) error
}
func (b *BlobStor) Rebuild(ctx context.Context, upd StorageIDUpdate, concLimiter common.RebuildLimiter, fillPercent int) error {
type ConcurrentWorkersLimiter interface {
AcquireWorkSlot(ctx context.Context) error
ReleaseWorkSlot()
}
func (b *BlobStor) Rebuild(ctx context.Context, upd StorageIDUpdate, limiter ConcurrentWorkersLimiter, fillPercent int) error {
var summary common.RebuildRes
var rErr error
for _, storage := range b.storage {
res, err := storage.Storage.Rebuild(ctx, common.RebuildPrm{
MetaStorage: upd,
Limiter: concLimiter,
WorkerLimiter: limiter,
FillPercent: fillPercent,
})
summary.FilesRemoved += res.FilesRemoved

View file

@ -74,7 +74,7 @@ func (e *StorageEngine) containerSize(ctx context.Context, prm ContainerSizePrm)
var csPrm shard.ContainerSizePrm
csPrm.SetContainerID(prm.cnr)
csRes, err := sh.Shard.ContainerSize(ctx, csPrm)
csRes, err := sh.Shard.ContainerSize(csPrm)
if err != nil {
e.reportShardError(ctx, sh, "can't get container size", err,
zap.Stringer("container_id", prm.cnr))

View file

@ -14,7 +14,6 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
"go.uber.org/zap"
)
@ -177,10 +176,7 @@ func (e *StorageEngine) reportShardError(
}
func isLogical(err error) bool {
return errors.As(err, &logicerr.Logical{}) ||
errors.Is(err, context.Canceled) ||
errors.Is(err, context.DeadlineExceeded) ||
errors.As(err, new(*apistatus.ResourceExhausted))
return errors.As(err, &logicerr.Logical{}) || errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded)
}
// Option represents StorageEngine's constructor option.

View file

@ -3,10 +3,8 @@ package engine
import (
"context"
"path/filepath"
"sync/atomic"
"testing"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/qos"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/blobovniczatree"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/fstree"
@ -92,7 +90,6 @@ func testGetDefaultShardOptions(t testing.TB) []shard.Option {
),
shard.WithPiloramaOptions(pilorama.WithPath(filepath.Join(t.TempDir(), "pilorama"))),
shard.WithMetaBaseOptions(testGetDefaultMetabaseOptions(t)...),
shard.WithLimiter(&testQoSLimiter{t: t}),
}
}
@ -154,26 +151,3 @@ func newTestStorages(root string, smallSize uint64) ([]blobstor.SubStorage, *tes
},
}, smallFileStorage, largeFileStorage
}
var _ qos.Limiter = (*testQoSLimiter)(nil)
type testQoSLimiter struct {
t testing.TB
read atomic.Int64
write atomic.Int64
}
func (t *testQoSLimiter) Close() {
require.Equal(t.t, int64(0), t.read.Load(), "read requests count after limiter close must be 0")
require.Equal(t.t, int64(0), t.write.Load(), "write requests count after limiter close must be 0")
}
func (t *testQoSLimiter) ReadRequest(context.Context) (qos.ReleaseFunc, error) {
t.read.Add(1)
return func() { t.read.Add(-1) }, nil
}
func (t *testQoSLimiter) WriteRequest(context.Context) (qos.ReleaseFunc, error) {
t.write.Add(1)
return func() { t.write.Add(-1) }, nil
}

View file

@ -339,7 +339,7 @@ func (e *StorageEngine) processZeroSizeContainers(ctx context.Context, ids []cid
var drop []cid.ID
for id := range idMap {
prm.SetContainerID(id)
s, err := sh.ContainerSize(ctx, prm)
s, err := sh.ContainerSize(prm)
if err != nil {
e.log.Warn(ctx, logs.EngineFailedToGetContainerSize, zap.Stringer("container_id", id), zap.Error(err))
failed = true

View file

@ -4,7 +4,6 @@ import (
"context"
"sync"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
"go.opentelemetry.io/otel/attribute"
@ -42,7 +41,7 @@ func (e *StorageEngine) Rebuild(ctx context.Context, prm RebuildPrm) (RebuildRes
}
resGuard := &sync.Mutex{}
concLimiter := &concurrencyLimiter{semaphore: make(chan struct{}, prm.ConcurrencyLimit)}
limiter := shard.NewRebuildLimiter(prm.ConcurrencyLimit)
eg, egCtx := errgroup.WithContext(ctx)
for _, shardID := range prm.ShardIDs {
@ -62,7 +61,7 @@ func (e *StorageEngine) Rebuild(ctx context.Context, prm RebuildPrm) (RebuildRes
}
err := sh.ScheduleRebuild(egCtx, shard.RebuildPrm{
ConcurrencyLimiter: concLimiter,
ConcurrencyLimiter: limiter,
TargetFillPercent: prm.TargetFillPercent,
})
@ -89,20 +88,3 @@ func (e *StorageEngine) Rebuild(ctx context.Context, prm RebuildPrm) (RebuildRes
}
return res, nil
}
type concurrencyLimiter struct {
semaphore chan struct{}
}
func (l *concurrencyLimiter) AcquireWorkSlot(ctx context.Context) (common.ReleaseFunc, error) {
select {
case l.semaphore <- struct{}{}:
return l.releaseWorkSlot, nil
case <-ctx.Done():
return nil, ctx.Err()
}
}
func (l *concurrencyLimiter) releaseWorkSlot() {
<-l.semaphore
}

View file

@ -26,7 +26,7 @@ func (r ContainerSizeRes) Size() uint64 {
return r.size
}
func (s *Shard) ContainerSize(ctx context.Context, prm ContainerSizePrm) (ContainerSizeRes, error) {
func (s *Shard) ContainerSize(prm ContainerSizePrm) (ContainerSizeRes, error) {
s.m.RLock()
defer s.m.RUnlock()
@ -34,12 +34,6 @@ func (s *Shard) ContainerSize(ctx context.Context, prm ContainerSizePrm) (Contai
return ContainerSizeRes{}, ErrDegradedMode
}
release, err := s.opsLimiter.ReadRequest(ctx)
if err != nil {
return ContainerSizeRes{}, err
}
defer release()
size, err := s.metaBase.ContainerSize(prm.cnr)
if err != nil {
return ContainerSizeRes{}, fmt.Errorf("get container size: %w", err)
@ -75,12 +69,6 @@ func (s *Shard) ContainerCount(ctx context.Context, prm ContainerCountPrm) (Cont
return ContainerCountRes{}, ErrDegradedMode
}
release, err := s.opsLimiter.ReadRequest(ctx)
if err != nil {
return ContainerCountRes{}, err
}
defer release()
counters, err := s.metaBase.ContainerCount(ctx, prm.ContainerID)
if err != nil {
return ContainerCountRes{}, fmt.Errorf("get container counters: %w", err)
@ -112,12 +100,6 @@ func (s *Shard) DeleteContainerSize(ctx context.Context, id cid.ID) error {
return ErrDegradedMode
}
release, err := s.opsLimiter.WriteRequest(ctx)
if err != nil {
return err
}
defer release()
return s.metaBase.DeleteContainerSize(ctx, id)
}
@ -140,11 +122,5 @@ func (s *Shard) DeleteContainerCount(ctx context.Context, id cid.ID) error {
return ErrDegradedMode
}
release, err := s.opsLimiter.WriteRequest(ctx)
if err != nil {
return err
}
defer release()
return s.metaBase.DeleteContainerCount(ctx, id)
}

View file

@ -395,10 +395,6 @@ func (s *Shard) Close(ctx context.Context) error {
s.gc.stop(ctx)
}
if s.opsLimiter != nil {
s.opsLimiter.Close()
}
return lastErr
}
@ -449,10 +445,6 @@ func (s *Shard) Reload(ctx context.Context, opts ...Option) error {
return err
}
}
if c.opsLimiter != nil {
s.opsLimiter.Close()
s.opsLimiter = c.opsLimiter
}
return s.setMode(ctx, c.info.Mode)
}

View file

@ -23,12 +23,6 @@ func (s *Shard) LogicalObjectsCount(ctx context.Context) (uint64, error) {
return 0, ErrDegradedMode
}
release, err := s.opsLimiter.ReadRequest(ctx)
if err != nil {
return 0, err
}
defer release()
cc, err := s.metaBase.ObjectCounters()
if err != nil {
return 0, err

View file

@ -54,12 +54,6 @@ func (s *Shard) delete(ctx context.Context, prm DeletePrm, skipFailed bool) (Del
return DeleteRes{}, ErrDegradedMode
}
release, err := s.opsLimiter.WriteRequest(ctx)
if err != nil {
return DeleteRes{}, err
}
defer release()
result := DeleteRes{}
for _, addr := range prm.addr {
select {

View file

@ -53,6 +53,10 @@ func (s *Shard) Exists(ctx context.Context, prm ExistsPrm) (ExistsRes, error) {
))
defer span.End()
var exists bool
var locked bool
var err error
s.m.RLock()
defer s.m.RUnlock()
@ -60,18 +64,7 @@ func (s *Shard) Exists(ctx context.Context, prm ExistsPrm) (ExistsRes, error) {
return ExistsRes{}, ErrShardDisabled
} else if s.info.EvacuationInProgress {
return ExistsRes{}, logicerr.Wrap(new(apistatus.ObjectNotFound))
}
release, err := s.opsLimiter.ReadRequest(ctx)
if err != nil {
return ExistsRes{}, err
}
defer release()
var exists bool
var locked bool
if s.info.Mode.NoMetabase() {
} else if s.info.Mode.NoMetabase() {
var p common.ExistsPrm
p.Address = prm.Address

View file

@ -291,7 +291,28 @@ func (s *Shard) removeGarbage(pctx context.Context) (result gcRunResult) {
s.log.Debug(ctx, logs.ShardGCRemoveGarbageStarted)
defer s.log.Debug(ctx, logs.ShardGCRemoveGarbageCompleted)
buf, err := s.getGarbage(ctx)
buf := make([]oid.Address, 0, s.rmBatchSize)
var iterPrm meta.GarbageIterationPrm
iterPrm.SetHandler(func(g meta.GarbageObject) error {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
buf = append(buf, g.Address())
if len(buf) == s.rmBatchSize {
return meta.ErrInterruptIterator
}
return nil
})
// iterate over metabase's objects with GC mark
// (no more than s.rmBatchSize objects)
err := s.metaBase.IterateOverGarbage(ctx, iterPrm)
if err != nil {
s.log.Warn(ctx, logs.ShardIteratorOverMetabaseGraveyardFailed,
zap.Error(err),
@ -323,39 +344,6 @@ func (s *Shard) removeGarbage(pctx context.Context) (result gcRunResult) {
return
}
func (s *Shard) getGarbage(ctx context.Context) ([]oid.Address, error) {
release, err := s.opsLimiter.ReadRequest(ctx)
if err != nil {
return nil, err
}
defer release()
buf := make([]oid.Address, 0, s.rmBatchSize)
var iterPrm meta.GarbageIterationPrm
iterPrm.SetHandler(func(g meta.GarbageObject) error {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
buf = append(buf, g.Address())
if len(buf) == s.rmBatchSize {
return meta.ErrInterruptIterator
}
return nil
})
if err := s.metaBase.IterateOverGarbage(ctx, iterPrm); err != nil {
return nil, err
}
return buf, nil
}
func (s *Shard) getExpiredObjectsParameters() (workerCount, batchSize int) {
workerCount = max(minExpiredWorkers, s.gc.gcCfg.expiredCollectorWorkerCount)
batchSize = max(minExpiredBatchSize, s.gc.gcCfg.expiredCollectorBatchSize)
@ -434,9 +422,18 @@ func (s *Shard) handleExpiredObjects(ctx context.Context, expired []oid.Address)
return
}
res, err := s.inhumeGC(ctx, expired)
var inhumePrm meta.InhumePrm
inhumePrm.SetAddresses(expired...)
inhumePrm.SetGCMark()
// inhume the collected objects
res, err := s.metaBase.Inhume(ctx, inhumePrm)
if err != nil {
s.log.Warn(ctx, logs.ShardCouldNotInhumeTheObjects, zap.Error(err))
s.log.Warn(ctx, logs.ShardCouldNotInhumeTheObjects,
zap.Error(err),
)
return
}
@ -454,12 +451,6 @@ func (s *Shard) handleExpiredObjects(ctx context.Context, expired []oid.Address)
}
func (s *Shard) getExpiredWithLinked(ctx context.Context, source []oid.Address) ([]oid.Address, error) {
release, err := s.opsLimiter.ReadRequest(ctx)
if err != nil {
return nil, err
}
defer release()
result := make([]oid.Address, 0, len(source))
parentToChildren, err := s.metaBase.GetChildren(ctx, source)
if err != nil {
@ -473,19 +464,6 @@ func (s *Shard) getExpiredWithLinked(ctx context.Context, source []oid.Address)
return result, nil
}
func (s *Shard) inhumeGC(ctx context.Context, addrs []oid.Address) (meta.InhumeRes, error) {
release, err := s.opsLimiter.WriteRequest(ctx)
if err != nil {
return meta.InhumeRes{}, err
}
defer release()
var inhumePrm meta.InhumePrm
inhumePrm.SetAddresses(addrs...)
inhumePrm.SetGCMark()
return s.metaBase.Inhume(ctx, inhumePrm)
}
func (s *Shard) collectExpiredTombstones(ctx context.Context, e Event) {
var err error
startedAt := time.Now()
@ -527,17 +505,11 @@ func (s *Shard) collectExpiredTombstones(ctx context.Context, e Event) {
return
}
release, err := s.opsLimiter.ReadRequest(ctx)
if err != nil {
log.Error(ctx, logs.ShardIteratorOverGraveyardFailed, zap.Error(err))
s.m.RUnlock()
return
}
err = s.metaBase.IterateOverGraveyard(ctx, iterPrm)
release()
if err != nil {
log.Error(ctx, logs.ShardIteratorOverGraveyardFailed, zap.Error(err))
s.m.RUnlock()
return
}
@ -626,13 +598,7 @@ func (s *Shard) getExpiredObjects(ctx context.Context, epoch uint64, onExpiredFo
return ErrDegradedMode
}
release, err := s.opsLimiter.ReadRequest(ctx)
if err != nil {
return err
}
defer release()
err = s.metaBase.IterateExpired(ctx, epoch, func(expiredObject *meta.ExpiredObject) error {
err := s.metaBase.IterateExpired(ctx, epoch, func(expiredObject *meta.ExpiredObject) error {
select {
case <-ctx.Done():
return meta.ErrInterruptIterator
@ -655,12 +621,6 @@ func (s *Shard) selectExpired(ctx context.Context, epoch uint64, addresses []oid
return nil, ErrDegradedMode
}
release, err := s.opsLimiter.ReadRequest(ctx)
if err != nil {
return nil, err
}
defer release()
return s.metaBase.FilterExpired(ctx, epoch, addresses)
}
@ -676,15 +636,12 @@ func (s *Shard) HandleExpiredTombstones(ctx context.Context, tss []meta.Tombston
return
}
release, err := s.opsLimiter.WriteRequest(ctx)
if err != nil {
s.log.Warn(ctx, logs.ShardCouldNotMarkTombstonesAsGarbage, zap.Error(err))
return
}
res, err := s.metaBase.InhumeTombstones(ctx, tss)
release()
if err != nil {
s.log.Warn(ctx, logs.ShardCouldNotMarkTombstonesAsGarbage, zap.Error(err))
s.log.Warn(ctx, logs.ShardCouldNotMarkTombstonesAsGarbage,
zap.Error(err),
)
return
}
@ -707,16 +664,11 @@ func (s *Shard) HandleExpiredLocks(ctx context.Context, epoch uint64, lockers []
if s.GetMode().NoMetabase() {
return
}
release, err := s.opsLimiter.WriteRequest(ctx)
if err != nil {
s.log.Warn(ctx, logs.ShardFailureToUnlockObjects, zap.Error(err))
return
}
unlocked, err := s.metaBase.FreeLockedBy(lockers)
release()
if err != nil {
s.log.Warn(ctx, logs.ShardFailureToUnlockObjects, zap.Error(err))
s.log.Warn(ctx, logs.ShardFailureToUnlockObjects,
zap.Error(err),
)
return
}
@ -724,15 +676,13 @@ func (s *Shard) HandleExpiredLocks(ctx context.Context, epoch uint64, lockers []
var pInhume meta.InhumePrm
pInhume.SetAddresses(lockers...)
pInhume.SetForceGCMark()
release, err = s.opsLimiter.WriteRequest(ctx)
if err != nil {
s.log.Warn(ctx, logs.ShardFailureToMarkLockersAsGarbage, zap.Error(err))
return
}
res, err := s.metaBase.Inhume(ctx, pInhume)
release()
if err != nil {
s.log.Warn(ctx, logs.ShardFailureToMarkLockersAsGarbage, zap.Error(err))
s.log.Warn(ctx, logs.ShardFailureToMarkLockersAsGarbage,
zap.Error(err),
)
return
}
@ -771,15 +721,12 @@ func (s *Shard) HandleDeletedLocks(ctx context.Context, lockers []oid.Address) {
return
}
release, err := s.opsLimiter.WriteRequest(ctx)
_, err := s.metaBase.FreeLockedBy(lockers)
if err != nil {
s.log.Warn(ctx, logs.ShardFailureToUnlockObjects, zap.Error(err))
return
}
_, err = s.metaBase.FreeLockedBy(lockers)
release()
if err != nil {
s.log.Warn(ctx, logs.ShardFailureToUnlockObjects, zap.Error(err))
s.log.Warn(ctx, logs.ShardFailureToUnlockObjects,
zap.Error(err),
)
return
}
}
@ -803,13 +750,7 @@ func (s *Shard) collectExpiredMetrics(ctx context.Context, e Event) {
}
func (s *Shard) collectExpiredContainerSizeMetrics(ctx context.Context, epoch uint64) {
release, err := s.opsLimiter.ReadRequest(ctx)
if err != nil {
s.log.Warn(ctx, logs.ShardGCFailedToCollectZeroSizeContainers, zap.Uint64("epoch", epoch), zap.Error(err))
return
}
ids, err := s.metaBase.ZeroSizeContainers(ctx)
release()
if err != nil {
s.log.Warn(ctx, logs.ShardGCFailedToCollectZeroSizeContainers, zap.Uint64("epoch", epoch), zap.Error(err))
return
@ -821,13 +762,7 @@ func (s *Shard) collectExpiredContainerSizeMetrics(ctx context.Context, epoch ui
}
func (s *Shard) collectExpiredContainerCountMetrics(ctx context.Context, epoch uint64) {
release, err := s.opsLimiter.ReadRequest(ctx)
if err != nil {
s.log.Warn(ctx, logs.ShardGCFailedToCollectZeroCountContainers, zap.Uint64("epoch", epoch), zap.Error(err))
return
}
ids, err := s.metaBase.ZeroCountContainers(ctx)
release()
if err != nil {
s.log.Warn(ctx, logs.ShardGCFailedToCollectZeroCountContainers, zap.Uint64("epoch", epoch), zap.Error(err))
return

View file

@ -111,12 +111,6 @@ func (s *Shard) Get(ctx context.Context, prm GetPrm) (GetRes, error) {
return c.Get(ctx, prm.addr)
}
release, err := s.opsLimiter.ReadRequest(ctx)
if err != nil {
return GetRes{}, err
}
defer release()
skipMeta := prm.skipMeta || s.info.Mode.NoMetabase()
obj, hasMeta, err := s.fetchObjectData(ctx, prm.addr, skipMeta, cb, wc)

View file

@ -81,12 +81,6 @@ func (s *Shard) Head(ctx context.Context, prm HeadPrm) (HeadRes, error) {
headParams.SetAddress(prm.addr)
headParams.SetRaw(prm.raw)
release, limitErr := s.opsLimiter.ReadRequest(ctx)
if limitErr != nil {
return HeadRes{}, limitErr
}
defer release()
var res meta.GetRes
res, err = s.metaBase.Get(ctx, headParams)
obj = res.Header()

View file

@ -81,12 +81,6 @@ func (s *Shard) Inhume(ctx context.Context, prm InhumePrm) (InhumeRes, error) {
return InhumeRes{}, ErrDegradedMode
}
release, err := s.opsLimiter.WriteRequest(ctx)
if err != nil {
return InhumeRes{}, err
}
defer release()
if s.hasWriteCache() {
for i := range prm.target {
_ = s.writeCache.Delete(ctx, prm.target[i])

View file

@ -106,12 +106,6 @@ func (s *Shard) List(ctx context.Context) (res SelectRes, err error) {
return SelectRes{}, ErrDegradedMode
}
release, err := s.opsLimiter.ReadRequest(ctx)
if err != nil {
return SelectRes{}, err
}
defer release()
lst, err := s.metaBase.Containers(ctx)
if err != nil {
return res, fmt.Errorf("list stored containers: %w", err)
@ -151,12 +145,6 @@ func (s *Shard) ListContainers(ctx context.Context, _ ListContainersPrm) (ListCo
return ListContainersRes{}, ErrDegradedMode
}
release, err := s.opsLimiter.ReadRequest(ctx)
if err != nil {
return ListContainersRes{}, err
}
defer release()
containers, err := s.metaBase.Containers(ctx)
if err != nil {
return ListContainersRes{}, fmt.Errorf("get list of containers: %w", err)
@ -185,12 +173,6 @@ func (s *Shard) ListWithCursor(ctx context.Context, prm ListWithCursorPrm) (List
return ListWithCursorRes{}, ErrDegradedMode
}
release, err := s.opsLimiter.ReadRequest(ctx)
if err != nil {
return ListWithCursorRes{}, err
}
defer release()
var metaPrm meta.ListPrm
metaPrm.SetCount(prm.count)
metaPrm.SetCursor(prm.cursor)
@ -220,15 +202,9 @@ func (s *Shard) IterateOverContainers(ctx context.Context, prm IterateOverContai
return ErrDegradedMode
}
release, err := s.opsLimiter.ReadRequest(ctx)
if err != nil {
return err
}
defer release()
var metaPrm meta.IterateOverContainersPrm
metaPrm.Handler = prm.Handler
err = s.metaBase.IterateOverContainers(ctx, metaPrm)
err := s.metaBase.IterateOverContainers(ctx, metaPrm)
if err != nil {
return fmt.Errorf("iterate over containers: %w", err)
}
@ -251,17 +227,11 @@ func (s *Shard) IterateOverObjectsInContainer(ctx context.Context, prm IterateOv
return ErrDegradedMode
}
release, err := s.opsLimiter.ReadRequest(ctx)
if err != nil {
return err
}
defer release()
var metaPrm meta.IterateOverObjectsInContainerPrm
metaPrm.ContainerID = prm.ContainerID
metaPrm.ObjectType = prm.ObjectType
metaPrm.Handler = prm.Handler
err = s.metaBase.IterateOverObjectsInContainer(ctx, metaPrm)
err := s.metaBase.IterateOverObjectsInContainer(ctx, metaPrm)
if err != nil {
return fmt.Errorf("iterate over objects: %w", err)
}
@ -281,12 +251,6 @@ func (s *Shard) CountAliveObjectsInContainer(ctx context.Context, prm CountAlive
return 0, ErrDegradedMode
}
release, err := s.opsLimiter.ReadRequest(ctx)
if err != nil {
return 0, err
}
defer release()
var metaPrm meta.CountAliveObjectsInContainerPrm
metaPrm.ObjectType = prm.ObjectType
metaPrm.ContainerID = prm.ContainerID

View file

@ -38,13 +38,7 @@ func (s *Shard) Lock(ctx context.Context, idCnr cid.ID, locker oid.ID, locked []
return ErrDegradedMode
}
release, err := s.opsLimiter.WriteRequest(ctx)
if err != nil {
return err
}
defer release()
err = s.metaBase.Lock(ctx, idCnr, locker, locked)
err := s.metaBase.Lock(ctx, idCnr, locker, locked)
if err != nil {
return fmt.Errorf("metabase lock: %w", err)
}
@ -67,12 +61,6 @@ func (s *Shard) IsLocked(ctx context.Context, addr oid.Address) (bool, error) {
return false, ErrDegradedMode
}
release, err := s.opsLimiter.ReadRequest(ctx)
if err != nil {
return false, err
}
defer release()
var prm meta.IsLockedPrm
prm.SetAddress(addr)
@ -98,12 +86,5 @@ func (s *Shard) GetLocks(ctx context.Context, addr oid.Address) ([]oid.ID, error
if m.NoMetabase() {
return nil, ErrDegradedMode
}
release, err := s.opsLimiter.ReadRequest(ctx)
if err != nil {
return nil, err
}
defer release()
return s.metaBase.GetLocks(ctx, addr)
}

View file

@ -67,12 +67,6 @@ func (s *Shard) Put(ctx context.Context, prm PutPrm) (PutRes, error) {
var res common.PutRes
release, err := s.opsLimiter.WriteRequest(ctx)
if err != nil {
return PutRes{}, err
}
defer release()
// exist check are not performed there, these checks should be executed
// ahead of `Put` by storage engine
tryCache := s.hasWriteCache() && !m.NoMetabase()

View file

@ -131,12 +131,6 @@ func (s *Shard) GetRange(ctx context.Context, prm RngPrm) (RngRes, error) {
return obj, nil
}
release, err := s.opsLimiter.ReadRequest(ctx)
if err != nil {
return RngRes{}, err
}
defer release()
skipMeta := prm.skipMeta || s.info.Mode.NoMetabase()
obj, hasMeta, err := s.fetchObjectData(ctx, prm.addr, skipMeta, cb, wc)

View file

@ -8,7 +8,6 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/qos"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
@ -21,8 +20,36 @@ import (
var ErrRebuildInProgress = errors.New("shard rebuild in progress")
type RebuildWorkerLimiter interface {
AcquireWorkSlot(ctx context.Context) error
ReleaseWorkSlot()
}
type rebuildLimiter struct {
semaphore chan struct{}
}
func NewRebuildLimiter(workersCount uint32) RebuildWorkerLimiter {
return &rebuildLimiter{
semaphore: make(chan struct{}, workersCount),
}
}
func (l *rebuildLimiter) AcquireWorkSlot(ctx context.Context) error {
select {
case l.semaphore <- struct{}{}:
return nil
case <-ctx.Done():
return ctx.Err()
}
}
func (l *rebuildLimiter) ReleaseWorkSlot() {
<-l.semaphore
}
type rebuildTask struct {
concurrencyLimiter common.RebuildLimiter
limiter RebuildWorkerLimiter
fillPercent int
}
@ -63,14 +90,14 @@ func (r *rebuilder) Start(ctx context.Context, bs *blobstor.BlobStor, mb *meta.D
if !ok {
continue
}
runRebuild(ctx, bs, mb, log, t.fillPercent, t.concurrencyLimiter)
runRebuild(ctx, bs, mb, log, t.fillPercent, t.limiter)
}
}
}()
}
func runRebuild(ctx context.Context, bs *blobstor.BlobStor, mb *meta.DB, log *logger.Logger,
fillPercent int, concLimiter common.RebuildLimiter,
fillPercent int, limiter RebuildWorkerLimiter,
) {
select {
case <-ctx.Done():
@ -79,20 +106,20 @@ func runRebuild(ctx context.Context, bs *blobstor.BlobStor, mb *meta.DB, log *lo
}
log.Info(ctx, logs.BlobstoreRebuildStarted)
ctx = tagging.ContextWithIOTag(ctx, qos.IOTagBackground.String())
if err := bs.Rebuild(ctx, &mbStorageIDUpdate{mb: mb}, concLimiter, fillPercent); err != nil {
if err := bs.Rebuild(ctx, &mbStorageIDUpdate{mb: mb}, limiter, fillPercent); err != nil {
log.Warn(ctx, logs.FailedToRebuildBlobstore, zap.Error(err))
} else {
log.Info(ctx, logs.BlobstoreRebuildCompletedSuccessfully)
}
}
func (r *rebuilder) ScheduleRebuild(ctx context.Context, limiter common.RebuildLimiter, fillPercent int,
func (r *rebuilder) ScheduleRebuild(ctx context.Context, limiter RebuildWorkerLimiter, fillPercent int,
) error {
select {
case <-ctx.Done():
return ctx.Err()
case r.tasks <- rebuildTask{
concurrencyLimiter: limiter,
limiter: limiter,
fillPercent: fillPercent,
}:
return nil
@ -142,7 +169,7 @@ func (u *mbStorageIDUpdate) UpdateStorageID(ctx context.Context, addr oid.Addres
}
type RebuildPrm struct {
ConcurrencyLimiter common.ConcurrencyLimiter
ConcurrencyLimiter RebuildWorkerLimiter
TargetFillPercent uint32
}
@ -164,30 +191,5 @@ func (s *Shard) ScheduleRebuild(ctx context.Context, p RebuildPrm) error {
return ErrDegradedMode
}
limiter := &rebuildLimiter{
concurrencyLimiter: p.ConcurrencyLimiter,
rateLimiter: s.opsLimiter,
}
return s.rb.ScheduleRebuild(ctx, limiter, int(p.TargetFillPercent))
}
var _ common.RebuildLimiter = (*rebuildLimiter)(nil)
type rebuildLimiter struct {
concurrencyLimiter common.ConcurrencyLimiter
rateLimiter qos.Limiter
}
func (r *rebuildLimiter) AcquireWorkSlot(ctx context.Context) (common.ReleaseFunc, error) {
return r.concurrencyLimiter.AcquireWorkSlot(ctx)
}
func (r *rebuildLimiter) ReadRequest(ctx context.Context) (common.ReleaseFunc, error) {
release, err := r.rateLimiter.ReadRequest(ctx)
return common.ReleaseFunc(release), err
}
func (r *rebuildLimiter) WriteRequest(ctx context.Context) (common.ReleaseFunc, error) {
release, err := r.rateLimiter.WriteRequest(ctx)
return common.ReleaseFunc(release), err
return s.rb.ScheduleRebuild(ctx, p.ConcurrencyLimiter, int(p.TargetFillPercent))
}

View file

@ -60,12 +60,6 @@ func (s *Shard) Select(ctx context.Context, prm SelectPrm) (SelectRes, error) {
return SelectRes{}, ErrDegradedMode
}
release, err := s.opsLimiter.ReadRequest(ctx)
if err != nil {
return SelectRes{}, nil
}
defer release()
var selectPrm meta.SelectPrm
selectPrm.SetFilters(prm.filters)
selectPrm.SetContainerID(prm.cnr)

View file

@ -7,7 +7,6 @@ import (
"time"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/qos"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor"
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
@ -99,8 +98,6 @@ type cfg struct {
reportErrorFunc func(ctx context.Context, selfID string, message string, err error)
containerInfo container.InfoProvider
opsLimiter qos.Limiter
}
func defaultCfg() *cfg {
@ -112,7 +109,6 @@ func defaultCfg() *cfg {
zeroSizeContainersCallback: func(context.Context, []cid.ID) {},
zeroCountContainersCallback: func(context.Context, []cid.ID) {},
metricsWriter: noopMetrics{},
opsLimiter: qos.NewNoopLimiter(),
}
}
@ -372,12 +368,6 @@ func WithContainerInfoProvider(containerInfo container.InfoProvider) Option {
}
}
func WithLimiter(l qos.Limiter) Option {
return func(c *cfg) {
c.opsLimiter = l
}
}
func (s *Shard) fillInfo() {
s.cfg.info.MetaBaseInfo = s.metaBase.DumpInfo()
s.cfg.info.BlobStorInfo = s.blobStor.DumpInfo()

View file

@ -43,11 +43,6 @@ func (s *Shard) TreeMove(ctx context.Context, d pilorama.CIDDescriptor, treeID s
if s.info.Mode.NoMetabase() {
return nil, ErrDegradedMode
}
release, err := s.opsLimiter.WriteRequest(ctx)
if err != nil {
return nil, err
}
defer release()
return s.pilorama.TreeMove(ctx, d, treeID, m)
}
@ -80,11 +75,6 @@ func (s *Shard) TreeAddByPath(ctx context.Context, d pilorama.CIDDescriptor, tre
if s.info.Mode.NoMetabase() {
return nil, ErrDegradedMode
}
release, err := s.opsLimiter.WriteRequest(ctx)
if err != nil {
return nil, err
}
defer release()
return s.pilorama.TreeAddByPath(ctx, d, treeID, attr, path, meta)
}
@ -113,11 +103,6 @@ func (s *Shard) TreeApply(ctx context.Context, cnr cidSDK.ID, treeID string, m *
if s.info.Mode.NoMetabase() {
return ErrDegradedMode
}
release, err := s.opsLimiter.WriteRequest(ctx)
if err != nil {
return err
}
defer release()
return s.pilorama.TreeApply(ctx, cnr, treeID, m, backgroundSync)
}
@ -145,11 +130,6 @@ func (s *Shard) TreeApplyBatch(ctx context.Context, cnr cidSDK.ID, treeID string
if s.info.Mode.NoMetabase() {
return ErrDegradedMode
}
release, err := s.opsLimiter.WriteRequest(ctx)
if err != nil {
return err
}
defer release()
return s.pilorama.TreeApplyBatch(ctx, cnr, treeID, m)
}
@ -177,11 +157,6 @@ func (s *Shard) TreeGetByPath(ctx context.Context, cid cidSDK.ID, treeID string,
if s.info.Mode.NoMetabase() {
return nil, ErrDegradedMode
}
release, err := s.opsLimiter.ReadRequest(ctx)
if err != nil {
return nil, err
}
defer release()
return s.pilorama.TreeGetByPath(ctx, cid, treeID, attr, path, latest)
}
@ -207,11 +182,6 @@ func (s *Shard) TreeGetMeta(ctx context.Context, cid cidSDK.ID, treeID string, n
if s.info.Mode.NoMetabase() {
return pilorama.Meta{}, 0, ErrDegradedMode
}
release, err := s.opsLimiter.ReadRequest(ctx)
if err != nil {
return pilorama.Meta{}, 0, err
}
defer release()
return s.pilorama.TreeGetMeta(ctx, cid, treeID, nodeID)
}
@ -237,11 +207,6 @@ func (s *Shard) TreeGetChildren(ctx context.Context, cid cidSDK.ID, treeID strin
if s.info.Mode.NoMetabase() {
return nil, ErrDegradedMode
}
release, err := s.opsLimiter.ReadRequest(ctx)
if err != nil {
return nil, err
}
defer release()
return s.pilorama.TreeGetChildren(ctx, cid, treeID, nodeID)
}
@ -266,11 +231,6 @@ func (s *Shard) TreeSortedByFilename(ctx context.Context, cid cidSDK.ID, treeID
if s.info.Mode.NoMetabase() {
return nil, last, ErrDegradedMode
}
release, err := s.opsLimiter.ReadRequest(ctx)
if err != nil {
return nil, last, err
}
defer release()
return s.pilorama.TreeSortedByFilename(ctx, cid, treeID, nodeID, last, count)
}
@ -296,11 +256,6 @@ func (s *Shard) TreeGetOpLog(ctx context.Context, cid cidSDK.ID, treeID string,
if s.info.Mode.NoMetabase() {
return pilorama.Move{}, ErrDegradedMode
}
release, err := s.opsLimiter.ReadRequest(ctx)
if err != nil {
return pilorama.Move{}, err
}
defer release()
return s.pilorama.TreeGetOpLog(ctx, cid, treeID, height)
}
@ -325,11 +280,6 @@ func (s *Shard) TreeDrop(ctx context.Context, cid cidSDK.ID, treeID string) erro
if s.info.Mode.NoMetabase() {
return ErrDegradedMode
}
release, err := s.opsLimiter.WriteRequest(ctx)
if err != nil {
return err
}
defer release()
return s.pilorama.TreeDrop(ctx, cid, treeID)
}
@ -353,11 +303,6 @@ func (s *Shard) TreeList(ctx context.Context, cid cidSDK.ID) ([]string, error) {
if s.info.Mode.NoMetabase() {
return nil, ErrDegradedMode
}
release, err := s.opsLimiter.ReadRequest(ctx)
if err != nil {
return nil, err
}
defer release()
return s.pilorama.TreeList(ctx, cid)
}
@ -381,11 +326,6 @@ func (s *Shard) TreeHeight(ctx context.Context, cid cidSDK.ID, treeID string) (u
if s.pilorama == nil {
return 0, ErrPiloramaDisabled
}
release, err := s.opsLimiter.ReadRequest(ctx)
if err != nil {
return 0, err
}
defer release()
return s.pilorama.TreeHeight(ctx, cid, treeID)
}
@ -410,11 +350,6 @@ func (s *Shard) TreeExists(ctx context.Context, cid cidSDK.ID, treeID string) (b
if s.info.Mode.NoMetabase() {
return false, ErrDegradedMode
}
release, err := s.opsLimiter.ReadRequest(ctx)
if err != nil {
return false, err
}
defer release()
return s.pilorama.TreeExists(ctx, cid, treeID)
}
@ -443,11 +378,6 @@ func (s *Shard) TreeUpdateLastSyncHeight(ctx context.Context, cid cidSDK.ID, tre
if s.info.Mode.NoMetabase() {
return ErrDegradedMode
}
release, err := s.opsLimiter.WriteRequest(ctx)
if err != nil {
return err
}
defer release()
return s.pilorama.TreeUpdateLastSyncHeight(ctx, cid, treeID, height)
}
@ -472,11 +402,6 @@ func (s *Shard) TreeLastSyncHeight(ctx context.Context, cid cidSDK.ID, treeID st
if s.info.Mode.NoMetabase() {
return 0, ErrDegradedMode
}
release, err := s.opsLimiter.ReadRequest(ctx)
if err != nil {
return 0, err
}
defer release()
return s.pilorama.TreeLastSyncHeight(ctx, cid, treeID)
}
@ -498,11 +423,6 @@ func (s *Shard) TreeListTrees(ctx context.Context, prm pilorama.TreeListTreesPrm
if s.info.Mode.NoMetabase() {
return nil, ErrDegradedMode
}
release, err := s.opsLimiter.ReadRequest(ctx)
if err != nil {
return nil, err
}
defer release()
return s.pilorama.TreeListTrees(ctx, prm)
}
@ -532,10 +452,5 @@ func (s *Shard) TreeApplyStream(ctx context.Context, cnr cidSDK.ID, treeID strin
if s.info.Mode.NoMetabase() {
return ErrDegradedMode
}
release, err := s.opsLimiter.WriteRequest(ctx)
if err != nil {
return err
}
defer release()
return s.pilorama.TreeApplyStream(ctx, cnr, treeID, source)
}

View file

@ -67,12 +67,6 @@ func (s *Shard) FlushWriteCache(ctx context.Context, p FlushWriteCachePrm) error
return ErrDegradedMode
}
release, err := s.opsLimiter.WriteRequest(ctx)
if err != nil {
return err
}
defer release()
return s.writeCache.Flush(ctx, p.ignoreErrors, p.seal)
}
@ -130,13 +124,6 @@ func (s *Shard) SealWriteCache(ctx context.Context, p SealWriteCachePrm) error {
close(started)
defer cleanup()
release, err := s.opsLimiter.WriteRequest(ctx)
if err != nil {
s.log.Warn(ctx, logs.FailedToSealWritecacheAsync, zap.Error(err))
return
}
defer release()
s.log.Info(ctx, logs.StartedWritecacheSealAsync)
if err := s.writeCache.Seal(ctx, prm); err != nil {
s.log.Warn(ctx, logs.FailedToSealWritecacheAsync, zap.Error(err))
@ -151,11 +138,5 @@ func (s *Shard) SealWriteCache(ctx context.Context, p SealWriteCachePrm) error {
return nil
}
}
release, err := s.opsLimiter.WriteRequest(ctx)
if err != nil {
return err
}
defer release()
return s.writeCache.Seal(ctx, prm)
}

View file

@ -163,7 +163,7 @@ func (a *auditPutStream) Send(ctx context.Context, req *object.PutRequest) error
if err != nil {
a.failed = true
}
if err != nil && !errors.Is(err, util.ErrAbortStream) { // CloseAndRecv will not be called, so log here
if !errors.Is(err, util.ErrAbortStream) { // CloseAndRecv will not be called, so log here
audit.LogRequestWithKey(ctx, a.log, objectGRPC.ObjectService_Put_FullMethodName, a.key,
audit.TargetFromContainerIDObjectID(a.containerID, a.objectID),
!a.failed)

View file

@ -214,9 +214,6 @@ func (s *Streamer) Send(ctx context.Context, req *objectV2.PatchRequest) error {
}
func (s *Streamer) CloseAndRecv(ctx context.Context) (*objectV2.PatchResponse, error) {
if s.patcher == nil {
return nil, errors.New("uninitialized patch streamer")
}
patcherResp, err := s.patcher.Close(ctx)
if err != nil {
return nil, err

View file

@ -3,8 +3,6 @@ package object
import (
"context"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/assert"
"git.frostfs.info/TrueCloudLab/frostfs-qos/tagging"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/object"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/session"
)
@ -122,24 +120,13 @@ type qosSendRecv[TReq qosVerificationHeader, TResp any] interface {
type qosWriteStream[TReq qosVerificationHeader, TResp any] struct {
s qosSendRecv[TReq, TResp]
adj AdjustIOTag
ioTag string
ioTagDefined bool
}
func (q *qosWriteStream[TReq, TResp]) CloseAndRecv(ctx context.Context) (TResp, error) {
if q.ioTagDefined {
ctx = tagging.ContextWithIOTag(ctx, q.ioTag)
}
return q.s.CloseAndRecv(ctx)
}
func (q *qosWriteStream[TReq, TResp]) Send(ctx context.Context, req TReq) error {
if !q.ioTagDefined {
ctx = q.adj.AdjustIncomingTag(ctx, req.GetVerificationHeader().GetBodySignature().GetKey())
q.ioTag, q.ioTagDefined = tagging.IOTagFromContext(ctx)
}
assert.True(q.ioTagDefined, "io tag undefined after incoming tag adjustment")
ctx = tagging.ContextWithIOTag(ctx, q.ioTag)
return q.s.Send(ctx, req)
}