forked from TrueCloudLab/frostfs-node
Compare commits
17 commits
master
...
feature/46
Author | SHA1 | Date | |
---|---|---|---|
0530f04670 | |||
80b581d499 | |||
805862f4b7 | |||
426cf58b98 | |||
edbe06e07e | |||
cbfeb72466 | |||
053a195ac2 | |||
cfc5ce7853 | |||
c3fa902780 | |||
6010dfdf3d | |||
a6c9a337cd | |||
b1a1b2107d | |||
d7838790c6 | |||
20b4447df7 | |||
9ba48c582d | |||
4358d3c423 | |||
afd2ba9a66 |
38 changed files with 791 additions and 125 deletions
110
cmd/frostfs-adm/internal/modules/morph/download.go
Normal file
110
cmd/frostfs-adm/internal/modules/morph/download.go
Normal file
|
@ -0,0 +1,110 @@
|
|||
package morph
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"net"
|
||||
"net/http"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"code.gitea.io/sdk/gitea"
|
||||
"github.com/spf13/cobra"
|
||||
)
|
||||
|
||||
func downloadContracts(cmd *cobra.Command, url string) (io.ReadCloser, error) {
|
||||
cmd.Printf("Downloading contracts archive from '%s'\n", url)
|
||||
|
||||
// HTTP client with connect timeout
|
||||
client := http.Client{
|
||||
Transport: &http.Transport{
|
||||
DialContext: (&net.Dialer{
|
||||
Timeout: 10 * time.Second,
|
||||
}).DialContext,
|
||||
},
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithTimeout(cmd.Context(), 60*time.Second)
|
||||
defer cancel()
|
||||
|
||||
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("can't create request: %w", err)
|
||||
}
|
||||
|
||||
resp, err := client.Do(req)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("can't fetch contracts archive: %w", err)
|
||||
}
|
||||
return resp.Body, nil
|
||||
}
|
||||
|
||||
type GiteaGlient interface {
|
||||
ListReleases(owner, repo string, opt gitea.ListReleasesOptions) ([]*gitea.Release, *gitea.Response, error)
|
||||
}
|
||||
|
||||
type GiteaRepo struct {
|
||||
client GiteaGlient
|
||||
}
|
||||
|
||||
func New() GiteaRepo {
|
||||
client, _ := gitea.NewClient("https://git.frostfs.info")
|
||||
return GiteaRepo{
|
||||
client: client,
|
||||
}
|
||||
}
|
||||
|
||||
func (g *GiteaRepo) ListReleases(owner, repo string, opt gitea.ListReleasesOptions) ([]*gitea.Release, *gitea.Response, error) {
|
||||
return g.client.ListReleases(owner, repo, opt)
|
||||
}
|
||||
|
||||
type Downloader interface {
|
||||
DownloadContracts(cmd *cobra.Command, url string) (io.ReadCloser, error)
|
||||
}
|
||||
|
||||
type ContractDownloader struct{}
|
||||
|
||||
func (d *ContractDownloader) DownloadContracts(cmd *cobra.Command, url string) (io.ReadCloser, error) {
|
||||
return downloadContracts(cmd, url)
|
||||
}
|
||||
|
||||
func downloadContractsFromRepository(cmd *cobra.Command) (io.ReadCloser, error) {
|
||||
giteaRepo := new(GiteaRepo)
|
||||
downloader := new(ContractDownloader)
|
||||
return downloadContractsFromGitea(cmd, giteaRepo, downloader)
|
||||
}
|
||||
|
||||
func downloadContractsFromGitea(cmd *cobra.Command, giteaRepo *GiteaRepo, downloader Downloader) (io.ReadCloser, error) {
|
||||
releases, _, err := giteaRepo.ListReleases("TrueCloudLab", "frostfs-contract", gitea.ListReleasesOptions{})
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("can't fetch release information: %w", err)
|
||||
|
||||
}
|
||||
|
||||
var latestRelease *gitea.Release
|
||||
for _, r := range releases {
|
||||
if !r.IsDraft && !r.IsPrerelease {
|
||||
latestRelease = r
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if latestRelease == nil {
|
||||
return nil, fmt.Errorf("attempt to fetch contracts archive from the official repository failed: no releases found")
|
||||
}
|
||||
|
||||
var url string
|
||||
for _, a := range latestRelease.Attachments {
|
||||
if strings.HasPrefix(a.Name, "frostfs-contract") {
|
||||
url = a.DownloadURL
|
||||
break
|
||||
}
|
||||
}
|
||||
if url == "" {
|
||||
return nil, errors.New("can't find contracts archive in the latest release")
|
||||
}
|
||||
|
||||
return downloader.DownloadContracts(cmd, url)
|
||||
}
|
186
cmd/frostfs-adm/internal/modules/morph/download_test.go
Normal file
186
cmd/frostfs-adm/internal/modules/morph/download_test.go
Normal file
|
@ -0,0 +1,186 @@
|
|||
package morph
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"code.gitea.io/sdk/gitea"
|
||||
"fmt"
|
||||
"github.com/spf13/cobra"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/mock"
|
||||
"github.com/stretchr/testify/require"
|
||||
"io"
|
||||
"testing"
|
||||
)
|
||||
|
||||
type MockGiteaRepo struct {
|
||||
mock.Mock
|
||||
*gitea.Client
|
||||
}
|
||||
|
||||
func (m *MockGiteaRepo) ListReleases(owner, repo string, opt gitea.ListReleasesOptions) ([]*gitea.Release, *gitea.Response, error) {
|
||||
args := m.Called(owner, repo, opt)
|
||||
return args.Get(0).([]*gitea.Release), args.Get(1).(*gitea.Response), args.Error(2)
|
||||
}
|
||||
|
||||
type MockDownloader struct {
|
||||
mock.Mock
|
||||
}
|
||||
|
||||
func (m *MockDownloader) DownloadContracts(cmd *cobra.Command, url string) (io.ReadCloser, error) {
|
||||
args := m.Called(cmd, url)
|
||||
return args.Get(0).(io.ReadCloser), args.Error(1)
|
||||
}
|
||||
|
||||
func TestGiteaRepo_DownloadContracts_ListReleasesFailure(t *testing.T) {
|
||||
m := MockGiteaRepo{}
|
||||
giteaRepo := GiteaRepo{
|
||||
client: &m,
|
||||
}
|
||||
|
||||
m.On("ListReleases", "TrueCloudLab", "frostfs-contract", gitea.ListReleasesOptions{}).
|
||||
Return([]*gitea.Release{}, &gitea.Response{Response: nil}, fmt.Errorf("failed"))
|
||||
res, err := downloadContractsFromGitea(nil, &giteaRepo, nil)
|
||||
assert.Nil(t, res, "result must be nil")
|
||||
require.Error(t, err, "error must be provided")
|
||||
assert.EqualError(t, err, "can't fetch release information: failed")
|
||||
}
|
||||
|
||||
func TestGiteaRepo_DownloadContracts_ListReleasesEmpty(t *testing.T) {
|
||||
m := MockGiteaRepo{}
|
||||
giteaRepo := GiteaRepo{
|
||||
client: &m,
|
||||
}
|
||||
m.On("ListReleases", "TrueCloudLab", "frostfs-contract", gitea.ListReleasesOptions{}).
|
||||
Return([]*gitea.Release{}, &gitea.Response{Response: nil}, nil)
|
||||
res, err := downloadContractsFromGitea(nil, &giteaRepo, nil)
|
||||
assert.Nil(t, res, "result must be nil")
|
||||
require.Error(t, err, "error must be provided")
|
||||
assert.EqualError(t, err, "attempt to fetch contracts archive from the official repository failed: no releases found")
|
||||
}
|
||||
|
||||
func TestGiteaRepo_DownloadContracts_ListReleasesOnlyDraftsAndPrereleases(t *testing.T) {
|
||||
mockRepo := MockGiteaRepo{}
|
||||
giteaRepo := GiteaRepo{
|
||||
client: &mockRepo,
|
||||
}
|
||||
|
||||
mockDownloader := MockDownloader{}
|
||||
|
||||
attachment := gitea.Attachment{
|
||||
ID: 1,
|
||||
DownloadURL: "https://git.frostfs.info/TrueCloudLab/frostfs-contract/releases/download/v0.19.2/frostfs-contract-v0.19.2.tar.gz",
|
||||
Name: "frostfs-contract",
|
||||
}
|
||||
|
||||
draft := gitea.Release{
|
||||
ID: 1,
|
||||
Title: "Draft",
|
||||
URL: "https://git.frostfs.info/TrueCloudLab/frostfs-contract/releases/tag/v0.19.2",
|
||||
IsDraft: true,
|
||||
IsPrerelease: false,
|
||||
Attachments: []*gitea.Attachment{&attachment},
|
||||
}
|
||||
|
||||
prerelease := gitea.Release{
|
||||
ID: 2,
|
||||
Title: "Prerelease",
|
||||
URL: "https://git.frostfs.info/TrueCloudLab/frostfs-contract/releases/tag/v0.20.1",
|
||||
IsDraft: false,
|
||||
IsPrerelease: true,
|
||||
}
|
||||
|
||||
cmd := cobra.Command{}
|
||||
|
||||
mockRepo.On("ListReleases", "TrueCloudLab", "frostfs-contract", gitea.ListReleasesOptions{}).
|
||||
Return([]*gitea.Release{&draft, &prerelease}, &gitea.Response{Response: nil}, nil)
|
||||
mockDownloader.On("DownloadContracts", &cmd, attachment.DownloadURL).
|
||||
Return(io.NopCloser(bytes.NewReader([]byte("data"))), nil)
|
||||
|
||||
res, err := downloadContractsFromGitea(&cmd, &giteaRepo, &mockDownloader)
|
||||
|
||||
assert.Nil(t, res, "result must be nil")
|
||||
require.Error(t, err, "error must be provided")
|
||||
assert.EqualError(t, err, "attempt to fetch contracts archive from the offitial repository failed: no releases found")
|
||||
}
|
||||
|
||||
func TestGiteaRepo_DownloadContracts_ListReleasesWrongAttachments(t *testing.T) {
|
||||
mockRepo := MockGiteaRepo{}
|
||||
giteaRepo := GiteaRepo{
|
||||
client: &mockRepo,
|
||||
}
|
||||
|
||||
attachment1 := gitea.Attachment{
|
||||
ID: 1,
|
||||
DownloadURL: "https://git.frostfs.info/TrueCloudLab/frostfs-contract/releases/download/v0.19.2/incorrect-name-v0.19.2.tar.gz",
|
||||
Name: "incorrect-name-1",
|
||||
}
|
||||
|
||||
attachment2 := gitea.Attachment{
|
||||
ID: 2,
|
||||
DownloadURL: "https://git.frostfs.info/TrueCloudLab/frostfs-contract/releases/download/v0.19.2/incorrect-name-v0.19.3.tar.gz",
|
||||
Name: "incorrect-name-2",
|
||||
}
|
||||
|
||||
release := gitea.Release{
|
||||
ID: 1,
|
||||
Title: "Title",
|
||||
URL: "https://git.frostfs.info/TrueCloudLab/frostfs-contract/releases/tag/v0.19.2",
|
||||
IsDraft: false,
|
||||
IsPrerelease: false,
|
||||
Attachments: []*gitea.Attachment{&attachment1, &attachment2},
|
||||
}
|
||||
|
||||
cmd := cobra.Command{}
|
||||
|
||||
mockRepo.On("ListReleases", "TrueCloudLab", "frostfs-contract", gitea.ListReleasesOptions{}).
|
||||
Return([]*gitea.Release{&release}, &gitea.Response{Response: nil}, nil)
|
||||
|
||||
res, err := downloadContractsFromGitea(&cmd, &giteaRepo, nil)
|
||||
|
||||
assert.Nil(t, res, "result must be nil")
|
||||
require.Error(t, err, "error must be provided")
|
||||
assert.EqualError(t, err, "can't find contracts archive in the latest release")
|
||||
}
|
||||
|
||||
func TestGiteaRepo_DownloadContracts_Success(t *testing.T) {
|
||||
mockRepo := MockGiteaRepo{}
|
||||
giteaRepo := GiteaRepo{
|
||||
client: &mockRepo,
|
||||
}
|
||||
|
||||
mockDownloader := MockDownloader{}
|
||||
|
||||
attachment1 := gitea.Attachment{
|
||||
ID: 1,
|
||||
DownloadURL: "https://git.frostfs.info/TrueCloudLab/frostfs-contract/releases/download/v0.19.2/incorrect-name-v0.19.2.tar.gz",
|
||||
Name: "incorrect-name-1",
|
||||
}
|
||||
|
||||
attachment2 := gitea.Attachment{
|
||||
ID: 2,
|
||||
DownloadURL: "https://git.frostfs.info/TrueCloudLab/frostfs-contract/releases/download/v0.19.2/frostfs-contract-v0.19.2.tar.gz",
|
||||
Name: "frostfs-contract",
|
||||
}
|
||||
|
||||
release := gitea.Release{
|
||||
ID: 1,
|
||||
Title: "Title",
|
||||
URL: "https://git.frostfs.info/TrueCloudLab/frostfs-contract/releases/tag/v0.19.2",
|
||||
IsDraft: false,
|
||||
IsPrerelease: false,
|
||||
Attachments: []*gitea.Attachment{&attachment1, &attachment2},
|
||||
}
|
||||
|
||||
cmd := cobra.Command{}
|
||||
|
||||
mockRepo.On("ListReleases", "TrueCloudLab", "frostfs-contract", gitea.ListReleasesOptions{}).
|
||||
Return([]*gitea.Release{&release}, &gitea.Response{Response: nil}, nil)
|
||||
mockDownloader.On("DownloadContracts", &cmd, attachment2.DownloadURL).
|
||||
Return(io.NopCloser(bytes.NewReader([]byte("data"))), nil)
|
||||
|
||||
res, err := downloadContractsFromGitea(&cmd, &giteaRepo, &mockDownloader)
|
||||
|
||||
assert.NotNil(t, res, "result must not be nil")
|
||||
require.NoError(t, err, "error must not be provided")
|
||||
mockDownloader.AssertCalled(t, "DownloadContracts", &cmd, attachment2.DownloadURL)
|
||||
}
|
|
@ -37,18 +37,14 @@ func forceNewEpochCmd(cmd *cobra.Command, _ []string) error {
|
|||
return err
|
||||
}
|
||||
|
||||
if err := wCtx.sendConsensusTx(bw.Bytes()); err != nil {
|
||||
return err
|
||||
if err = wCtx.sendConsensusTx(bw.Bytes()); err == nil {
|
||||
err = wCtx.awaitTx()
|
||||
}
|
||||
|
||||
if err := wCtx.awaitTx(); err != nil {
|
||||
if strings.Contains(err.Error(), "invalid epoch") {
|
||||
cmd.Println("Epoch has already ticked.")
|
||||
return nil
|
||||
}
|
||||
return err
|
||||
if err != nil && strings.Contains(err.Error(), "invalid epoch") {
|
||||
cmd.Println("Epoch has already ticked.")
|
||||
return nil
|
||||
}
|
||||
return nil
|
||||
return err
|
||||
}
|
||||
|
||||
func emitNewEpochCall(bw *io.BufBinWriter, wCtx *initializeContext, nmHash util.Uint160) error {
|
||||
|
|
|
@ -50,6 +50,7 @@ type initializeContext struct {
|
|||
Contracts map[string]*contractState
|
||||
Command *cobra.Command
|
||||
ContractPath string
|
||||
ContractURL string
|
||||
}
|
||||
|
||||
var ErrTooManyAlphabetNodes = fmt.Errorf("too many alphabet nodes (maximum allowed is %d)", maxAlphabetNodes)
|
||||
|
@ -152,6 +153,11 @@ func newInitializeContext(cmd *cobra.Command, v *viper.Viper) (*initializeContex
|
|||
return nil, err
|
||||
}
|
||||
|
||||
var ctrURL string
|
||||
if needContracts {
|
||||
ctrURL, _ = cmd.Flags().GetString(contractsURLFlag)
|
||||
}
|
||||
|
||||
if err := checkNotaryEnabled(c); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -176,6 +182,7 @@ func newInitializeContext(cmd *cobra.Command, v *viper.Viper) (*initializeContex
|
|||
Command: cmd,
|
||||
Contracts: make(map[string]*contractState),
|
||||
ContractPath: ctrPath,
|
||||
ContractURL: ctrURL,
|
||||
}
|
||||
|
||||
if needContracts {
|
||||
|
|
|
@ -399,10 +399,14 @@ func (c *initializeContext) readContracts(names []string) error {
|
|||
}
|
||||
} else {
|
||||
var r io.ReadCloser
|
||||
if c.ContractPath == "" {
|
||||
return errors.New("contracts flag is missing")
|
||||
if c.ContractPath != "" {
|
||||
r, err = os.Open(c.ContractPath)
|
||||
} else if c.ContractURL != "" {
|
||||
r, err = downloadContracts(c.Command, c.ContractURL)
|
||||
} else {
|
||||
r, err = downloadContractsFromRepository(c.Command)
|
||||
}
|
||||
r, err = os.Open(c.ContractPath)
|
||||
|
||||
if err != nil {
|
||||
return fmt.Errorf("can't open contracts archive: %w", err)
|
||||
}
|
||||
|
|
|
@ -17,6 +17,9 @@ const (
|
|||
storageGasCLIFlag = "initial-gas"
|
||||
storageGasConfigFlag = "storage.initial_gas"
|
||||
contractsInitFlag = "contracts"
|
||||
contractsInitFlagDesc = "Path to archive with compiled FrostFS contracts (the default is to fetch the latest release from the official repository)"
|
||||
contractsURLFlag = "contracts-url"
|
||||
contractsURLFlagDesc = "URL to archive with compiled FrostFS contracts"
|
||||
maxObjectSizeInitFlag = "network.max_object_size"
|
||||
maxObjectSizeCLIFlag = "max-object-size"
|
||||
epochDurationInitFlag = "network.epoch_duration"
|
||||
|
@ -370,8 +373,9 @@ func initUpdateContractsCmd() {
|
|||
RootCmd.AddCommand(updateContractsCmd)
|
||||
updateContractsCmd.Flags().String(alphabetWalletsFlag, "", alphabetWalletsFlagDesc)
|
||||
updateContractsCmd.Flags().StringP(endpointFlag, endpointFlagShort, "", endpointFlagDesc)
|
||||
updateContractsCmd.Flags().String(contractsInitFlag, "", "Path to archive with compiled FrostFS contracts")
|
||||
_ = updateContractsCmd.MarkFlagRequired(contractsInitFlag)
|
||||
updateContractsCmd.Flags().String(contractsInitFlag, "", contractsInitFlagDesc)
|
||||
updateContractsCmd.Flags().String(contractsURLFlag, "", contractsURLFlagDesc)
|
||||
updateContractsCmd.MarkFlagsMutuallyExclusive(contractsInitFlag, contractsURLFlag)
|
||||
}
|
||||
|
||||
func initDumpBalancesCmd() {
|
||||
|
@ -441,8 +445,8 @@ func initInitCmd() {
|
|||
RootCmd.AddCommand(initCmd)
|
||||
initCmd.Flags().String(alphabetWalletsFlag, "", alphabetWalletsFlagDesc)
|
||||
initCmd.Flags().StringP(endpointFlag, endpointFlagShort, "", endpointFlagDesc)
|
||||
initCmd.Flags().String(contractsInitFlag, "", "Path to archive with compiled FrostFS contracts")
|
||||
_ = initCmd.MarkFlagRequired(contractsInitFlag)
|
||||
initCmd.Flags().String(contractsInitFlag, "", contractsInitFlagDesc)
|
||||
initCmd.Flags().String(contractsURLFlag, "", contractsURLFlagDesc)
|
||||
initCmd.Flags().Uint(epochDurationCLIFlag, 240, "Amount of side chain blocks in one FrostFS epoch")
|
||||
initCmd.Flags().Uint(maxObjectSizeCLIFlag, 67108864, "Max single object size in bytes")
|
||||
initCmd.Flags().Bool(homomorphicHashDisabledCLIFlag, false, "Disable object homomorphic hashing")
|
||||
|
@ -451,6 +455,7 @@ func initInitCmd() {
|
|||
initCmd.Flags().Uint64(containerAliasFeeCLIFlag, 500, "Container alias fee")
|
||||
initCmd.Flags().String(protoConfigPath, "", "Path to the consensus node configuration")
|
||||
initCmd.Flags().String(localDumpFlag, "", "Path to the blocks dump file")
|
||||
initCmd.MarkFlagsMutuallyExclusive(contractsInitFlag, contractsURLFlag)
|
||||
}
|
||||
|
||||
func initGenerateAlphabetCmd() {
|
||||
|
|
|
@ -1,16 +1,13 @@
|
|||
package control
|
||||
|
||||
import (
|
||||
"crypto/sha256"
|
||||
"encoding/hex"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/rpc/client"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/commonflags"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/key"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-cli/modules/util"
|
||||
commonCmd "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/internal/common"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/control"
|
||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||
apechain "git.frostfs.info/TrueCloudLab/policy-engine/pkg/chain"
|
||||
"github.com/spf13/cobra"
|
||||
)
|
||||
|
@ -33,6 +30,8 @@ var addRuleCmd = &cobra.Command{
|
|||
func addRule(cmd *cobra.Command, _ []string) {
|
||||
pk := key.Get(cmd)
|
||||
|
||||
target := parseTarget(cmd)
|
||||
|
||||
chainID, _ := cmd.Flags().GetString(chainIDFlag)
|
||||
hexEncoded, _ := cmd.Flags().GetBool(chainIDHexFlag)
|
||||
|
||||
|
@ -44,13 +43,6 @@ func addRule(cmd *cobra.Command, _ []string) {
|
|||
commonCmd.ExitOnErr(cmd, "can't decode chain ID as hex: %w", err)
|
||||
}
|
||||
|
||||
var cnr cid.ID
|
||||
cidStr, _ := cmd.Flags().GetString(commonflags.CIDFlag)
|
||||
commonCmd.ExitOnErr(cmd, "can't decode container ID: %w", cnr.DecodeString(cidStr))
|
||||
|
||||
rawCID := make([]byte, sha256.Size)
|
||||
cnr.Encode(rawCID)
|
||||
|
||||
rule, _ := cmd.Flags().GetStringArray(ruleFlag)
|
||||
|
||||
chain := new(apechain.Chain)
|
||||
|
@ -63,11 +55,8 @@ func addRule(cmd *cobra.Command, _ []string) {
|
|||
|
||||
req := &control.AddChainLocalOverrideRequest{
|
||||
Body: &control.AddChainLocalOverrideRequest_Body{
|
||||
Target: &control.ChainTarget{
|
||||
Type: control.ChainTarget_CONTAINER,
|
||||
Name: cidStr,
|
||||
},
|
||||
Chain: serializedChain,
|
||||
Target: target,
|
||||
Chain: serializedChain,
|
||||
},
|
||||
}
|
||||
|
||||
|
@ -91,8 +80,10 @@ func initControlAddRuleCmd() {
|
|||
initControlFlags(addRuleCmd)
|
||||
|
||||
ff := addRuleCmd.Flags()
|
||||
ff.String(commonflags.CIDFlag, "", commonflags.CIDFlagUsage)
|
||||
ff.StringArray(ruleFlag, []string{}, "Rule statement")
|
||||
ff.String(chainIDFlag, "", "Assign ID to the parsed chain")
|
||||
ff.String(targetNameFlag, "", targetNameDesc)
|
||||
ff.String(targetTypeFlag, "", targetTypeDesc)
|
||||
_ = addRuleCmd.MarkFlagRequired(targetTypeFlag)
|
||||
ff.Bool(chainIDHexFlag, false, "Flag to parse chain ID as hex")
|
||||
}
|
||||
|
|
49
cmd/frostfs-cli/modules/control/detach_shards.go
Normal file
49
cmd/frostfs-cli/modules/control/detach_shards.go
Normal file
|
@ -0,0 +1,49 @@
|
|||
package control
|
||||
|
||||
import (
|
||||
rawclient "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/rpc/client"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/key"
|
||||
commonCmd "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/internal/common"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/control"
|
||||
"github.com/spf13/cobra"
|
||||
)
|
||||
|
||||
var shardsDetachCmd = &cobra.Command{
|
||||
Use: "detach",
|
||||
Short: "Detach and close the shards",
|
||||
Long: "Detach and close the shards",
|
||||
Run: shardsDetach,
|
||||
}
|
||||
|
||||
func shardsDetach(cmd *cobra.Command, _ []string) {
|
||||
pk := key.Get(cmd)
|
||||
|
||||
req := &control.DetachShardsRequest{
|
||||
Body: &control.DetachShardsRequest_Body{
|
||||
Shard_ID: getShardIDListFromIDFlag(cmd, false),
|
||||
},
|
||||
}
|
||||
|
||||
signRequest(cmd, pk, req)
|
||||
|
||||
cli := getClient(cmd, pk)
|
||||
|
||||
var resp *control.DetachShardsResponse
|
||||
var err error
|
||||
err = cli.ExecRaw(func(client *rawclient.Client) error {
|
||||
resp, err = control.DetachShards(client, req)
|
||||
return err
|
||||
})
|
||||
commonCmd.ExitOnErr(cmd, "rpc error: %w", err)
|
||||
|
||||
verifyResponse(cmd, resp.GetSignature(), resp.GetBody())
|
||||
|
||||
cmd.Println("Shard mode update request successfully sent.")
|
||||
}
|
||||
|
||||
func initControlShardsDetachCmd() {
|
||||
initControlFlags(shardsDetachCmd)
|
||||
|
||||
flags := shardsDetachCmd.Flags()
|
||||
flags.StringSlice(shardIDFlag, nil, "List of shard IDs in base58 encoding")
|
||||
}
|
|
@ -1,16 +1,13 @@
|
|||
package control
|
||||
|
||||
import (
|
||||
"crypto/sha256"
|
||||
"encoding/hex"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/rpc/client"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/commonflags"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/key"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-cli/modules/util"
|
||||
commonCmd "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/internal/common"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/control"
|
||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||
apechain "git.frostfs.info/TrueCloudLab/policy-engine/pkg/chain"
|
||||
"github.com/spf13/cobra"
|
||||
)
|
||||
|
@ -25,12 +22,7 @@ var getRuleCmd = &cobra.Command{
|
|||
func getRule(cmd *cobra.Command, _ []string) {
|
||||
pk := key.Get(cmd)
|
||||
|
||||
var cnr cid.ID
|
||||
cidStr, _ := cmd.Flags().GetString(commonflags.CIDFlag)
|
||||
commonCmd.ExitOnErr(cmd, "can't decode container ID: %w", cnr.DecodeString(cidStr))
|
||||
|
||||
rawCID := make([]byte, sha256.Size)
|
||||
cnr.Encode(rawCID)
|
||||
target := parseTarget(cmd)
|
||||
|
||||
chainID, _ := cmd.Flags().GetString(chainIDFlag)
|
||||
hexEncoded, _ := cmd.Flags().GetBool(chainIDHexFlag)
|
||||
|
@ -43,10 +35,7 @@ func getRule(cmd *cobra.Command, _ []string) {
|
|||
|
||||
req := &control.GetChainLocalOverrideRequest{
|
||||
Body: &control.GetChainLocalOverrideRequest_Body{
|
||||
Target: &control.ChainTarget{
|
||||
Name: cidStr,
|
||||
Type: control.ChainTarget_CONTAINER,
|
||||
},
|
||||
Target: target,
|
||||
ChainId: []byte(chainID),
|
||||
},
|
||||
}
|
||||
|
@ -74,7 +63,9 @@ func initControGetRuleCmd() {
|
|||
initControlFlags(getRuleCmd)
|
||||
|
||||
ff := getRuleCmd.Flags()
|
||||
ff.String(commonflags.CIDFlag, "", commonflags.CIDFlagUsage)
|
||||
ff.String(targetNameFlag, "", targetNameDesc)
|
||||
ff.String(targetTypeFlag, "", targetTypeDesc)
|
||||
_ = getRuleCmd.MarkFlagRequired(targetTypeFlag)
|
||||
ff.String(chainIDFlag, "", "Chain id")
|
||||
ff.Bool(chainIDHexFlag, false, "Flag to parse chain ID as hex")
|
||||
}
|
||||
|
|
|
@ -1,16 +1,17 @@
|
|||
package control
|
||||
|
||||
import (
|
||||
"crypto/sha256"
|
||||
"fmt"
|
||||
"strings"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/rpc/client"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/commonflags"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/key"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-cli/modules/util"
|
||||
commonCmd "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/internal/common"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/control"
|
||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||
apechain "git.frostfs.info/TrueCloudLab/policy-engine/pkg/chain"
|
||||
"github.com/nspcc-dev/neo-go/cli/input"
|
||||
"github.com/spf13/cobra"
|
||||
)
|
||||
|
||||
|
@ -21,22 +22,56 @@ var listRulesCmd = &cobra.Command{
|
|||
Run: listRules,
|
||||
}
|
||||
|
||||
const (
|
||||
defaultNamespace = "root"
|
||||
namespaceTarget = "namespace"
|
||||
containerTarget = "container"
|
||||
)
|
||||
|
||||
const (
|
||||
targetNameFlag = "target-name"
|
||||
targetNameDesc = "Resource name in APE resource name format"
|
||||
targetTypeFlag = "target-type"
|
||||
targetTypeDesc = "Resource type(container/namespace)"
|
||||
)
|
||||
|
||||
func parseTarget(cmd *cobra.Command) *control.ChainTarget {
|
||||
typ, _ := cmd.Flags().GetString(targetTypeFlag)
|
||||
name, _ := cmd.Flags().GetString(targetNameFlag)
|
||||
switch typ {
|
||||
case namespaceTarget:
|
||||
if name == "" {
|
||||
ln, err := input.ReadLine(fmt.Sprintf("Target name is not set. Confirm to use %s namespace (n|Y)> ", defaultNamespace))
|
||||
commonCmd.ExitOnErr(cmd, "read line error: %w", err)
|
||||
ln = strings.ToLower(ln)
|
||||
if len(ln) > 0 && (ln[0] == 'n') {
|
||||
commonCmd.ExitOnErr(cmd, "read namespace error: %w", fmt.Errorf("setting default value was declined"))
|
||||
}
|
||||
name = defaultNamespace
|
||||
}
|
||||
return &control.ChainTarget{
|
||||
Name: name,
|
||||
Type: control.ChainTarget_NAMESPACE,
|
||||
}
|
||||
case containerTarget:
|
||||
var cnr cid.ID
|
||||
commonCmd.ExitOnErr(cmd, "can't decode container ID: %w", cnr.DecodeString(name))
|
||||
return &control.ChainTarget{
|
||||
Name: name,
|
||||
Type: control.ChainTarget_CONTAINER,
|
||||
}
|
||||
default:
|
||||
commonCmd.ExitOnErr(cmd, "read target type error: %w", fmt.Errorf("unknown target type"))
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func listRules(cmd *cobra.Command, _ []string) {
|
||||
pk := key.Get(cmd)
|
||||
|
||||
var cnr cid.ID
|
||||
cidStr, _ := cmd.Flags().GetString(commonflags.CIDFlag)
|
||||
commonCmd.ExitOnErr(cmd, "can't decode container ID: %w", cnr.DecodeString(cidStr))
|
||||
|
||||
rawCID := make([]byte, sha256.Size)
|
||||
cnr.Encode(rawCID)
|
||||
|
||||
req := &control.ListChainLocalOverridesRequest{
|
||||
Body: &control.ListChainLocalOverridesRequest_Body{
|
||||
Target: &control.ChainTarget{
|
||||
Name: cidStr,
|
||||
Type: control.ChainTarget_CONTAINER,
|
||||
},
|
||||
Target: parseTarget(cmd),
|
||||
},
|
||||
}
|
||||
|
||||
|
@ -71,5 +106,7 @@ func initControlListRulesCmd() {
|
|||
initControlFlags(listRulesCmd)
|
||||
|
||||
ff := listRulesCmd.Flags()
|
||||
ff.String(commonflags.CIDFlag, "", commonflags.CIDFlagUsage)
|
||||
ff.String(targetNameFlag, "", targetNameDesc)
|
||||
ff.String(targetTypeFlag, "", targetTypeDesc)
|
||||
_ = listRulesCmd.MarkFlagRequired(targetTypeFlag)
|
||||
}
|
||||
|
|
|
@ -1,15 +1,12 @@
|
|||
package control
|
||||
|
||||
import (
|
||||
"crypto/sha256"
|
||||
"encoding/hex"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/rpc/client"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/commonflags"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/key"
|
||||
commonCmd "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/internal/common"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/control"
|
||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||
"github.com/spf13/cobra"
|
||||
)
|
||||
|
||||
|
@ -28,13 +25,6 @@ var removeRuleCmd = &cobra.Command{
|
|||
func removeRule(cmd *cobra.Command, _ []string) {
|
||||
pk := key.Get(cmd)
|
||||
|
||||
var cnr cid.ID
|
||||
cidStr, _ := cmd.Flags().GetString(commonflags.CIDFlag)
|
||||
commonCmd.ExitOnErr(cmd, "can't decode container ID: %w", cnr.DecodeString(cidStr))
|
||||
|
||||
rawCID := make([]byte, sha256.Size)
|
||||
cnr.Encode(rawCID)
|
||||
|
||||
chainID, _ := cmd.Flags().GetString(chainIDFlag)
|
||||
hexEncoded, _ := cmd.Flags().GetBool(chainIDHexFlag)
|
||||
|
||||
|
@ -48,10 +38,7 @@ func removeRule(cmd *cobra.Command, _ []string) {
|
|||
|
||||
req := &control.RemoveChainLocalOverrideRequest{
|
||||
Body: &control.RemoveChainLocalOverrideRequest_Body{
|
||||
Target: &control.ChainTarget{
|
||||
Name: cidStr,
|
||||
Type: control.ChainTarget_CONTAINER,
|
||||
},
|
||||
Target: parseTarget(cmd),
|
||||
ChainId: chainIDRaw,
|
||||
},
|
||||
}
|
||||
|
@ -81,7 +68,9 @@ func initControlRemoveRuleCmd() {
|
|||
initControlFlags(removeRuleCmd)
|
||||
|
||||
ff := removeRuleCmd.Flags()
|
||||
ff.String(commonflags.CIDFlag, "", commonflags.CIDFlagUsage)
|
||||
ff.String(targetNameFlag, "", targetNameDesc)
|
||||
ff.String(targetTypeFlag, "", targetTypeDesc)
|
||||
_ = removeRuleCmd.MarkFlagRequired(targetTypeFlag)
|
||||
ff.String(chainIDFlag, "", "Chain id")
|
||||
ff.Bool(chainIDHexFlag, false, "Flag to parse chain ID as hex")
|
||||
}
|
||||
|
|
|
@ -18,6 +18,7 @@ func initControlShardsCmd() {
|
|||
shardsCmd.AddCommand(flushCacheCmd)
|
||||
shardsCmd.AddCommand(doctorCmd)
|
||||
shardsCmd.AddCommand(writecacheShardCmd)
|
||||
shardsCmd.AddCommand(shardsDetachCmd)
|
||||
|
||||
initControlShardsListCmd()
|
||||
initControlSetShardModeCmd()
|
||||
|
@ -26,4 +27,5 @@ func initControlShardsCmd() {
|
|||
initControlFlushCacheCmd()
|
||||
initControlDoctorCmd()
|
||||
initControlShardsWritecacheCmd()
|
||||
initControlShardsDetachCmd()
|
||||
}
|
||||
|
|
|
@ -145,9 +145,17 @@ func getShardIDList(cmd *cobra.Command) [][]byte {
|
|||
return nil
|
||||
}
|
||||
|
||||
return getShardIDListFromIDFlag(cmd, true)
|
||||
}
|
||||
|
||||
func getShardIDListFromIDFlag(cmd *cobra.Command, withAllFlag bool) [][]byte {
|
||||
sidList, _ := cmd.Flags().GetStringSlice(shardIDFlag)
|
||||
if len(sidList) == 0 {
|
||||
commonCmd.ExitOnErr(cmd, "", fmt.Errorf("either --%s or --%s flag must be provided", shardIDFlag, shardAllFlag))
|
||||
if withAllFlag {
|
||||
commonCmd.ExitOnErr(cmd, "", fmt.Errorf("either --%s or --%s flag must be provided", shardIDFlag, shardAllFlag))
|
||||
} else {
|
||||
commonCmd.ExitOnErr(cmd, "", fmt.Errorf("--%s flag value must be provided", shardIDFlag))
|
||||
}
|
||||
}
|
||||
|
||||
// We can sort the ID list and perform this check without additional allocations,
|
||||
|
|
|
@ -632,14 +632,8 @@ type cfgAccessPolicyEngine struct {
|
|||
type cfgObjectRoutines struct {
|
||||
putRemote *ants.Pool
|
||||
|
||||
putRemoteCapacity int
|
||||
|
||||
putLocal *ants.Pool
|
||||
|
||||
putLocalCapacity int
|
||||
|
||||
replicatorPoolSize int
|
||||
|
||||
replication *ants.Pool
|
||||
}
|
||||
|
||||
|
@ -1094,20 +1088,20 @@ func initObjectPool(cfg *config.Config) (pool cfgObjectRoutines) {
|
|||
|
||||
optNonBlocking := ants.WithNonblocking(true)
|
||||
|
||||
pool.putRemoteCapacity = objectconfig.Put(cfg).PoolSizeRemote()
|
||||
pool.putRemote, err = ants.NewPool(pool.putRemoteCapacity, optNonBlocking)
|
||||
putRemoteCapacity := objectconfig.Put(cfg).PoolSizeRemote()
|
||||
pool.putRemote, err = ants.NewPool(putRemoteCapacity, optNonBlocking)
|
||||
fatalOnErr(err)
|
||||
|
||||
pool.putLocalCapacity = objectconfig.Put(cfg).PoolSizeLocal()
|
||||
pool.putLocal, err = ants.NewPool(pool.putLocalCapacity, optNonBlocking)
|
||||
putLocalCapacity := objectconfig.Put(cfg).PoolSizeLocal()
|
||||
pool.putLocal, err = ants.NewPool(putLocalCapacity, optNonBlocking)
|
||||
fatalOnErr(err)
|
||||
|
||||
pool.replicatorPoolSize = replicatorconfig.PoolSize(cfg)
|
||||
if pool.replicatorPoolSize <= 0 {
|
||||
pool.replicatorPoolSize = pool.putRemoteCapacity
|
||||
replicatorPoolSize := replicatorconfig.PoolSize(cfg)
|
||||
if replicatorPoolSize <= 0 {
|
||||
replicatorPoolSize = putRemoteCapacity
|
||||
}
|
||||
|
||||
pool.replication, err = ants.NewPool(pool.replicatorPoolSize)
|
||||
pool.replication, err = ants.NewPool(replicatorPoolSize)
|
||||
fatalOnErr(err)
|
||||
|
||||
return pool
|
||||
|
@ -1241,6 +1235,7 @@ func (c *cfg) reloadConfig(ctx context.Context) {
|
|||
setRuntimeParameters(c)
|
||||
return nil
|
||||
}})
|
||||
components = append(components, dCmp{"pools", c.reloadPools})
|
||||
components = append(components, dCmp{"tracing", func() error {
|
||||
updated, err := tracing.Setup(ctx, *tracingconfig.ToTracingConfig(c.appCfg))
|
||||
if updated {
|
||||
|
@ -1285,6 +1280,28 @@ func (c *cfg) reloadConfig(ctx context.Context) {
|
|||
c.log.Info(logs.FrostFSNodeConfigurationHasBeenReloadedSuccessfully)
|
||||
}
|
||||
|
||||
func (c *cfg) reloadPools() error {
|
||||
newSize := objectconfig.Put(c.appCfg).PoolSizeLocal()
|
||||
c.reloadPool(c.cfgObject.pool.putLocal, newSize, "object.put.local_pool_size")
|
||||
|
||||
newSize = objectconfig.Put(c.appCfg).PoolSizeRemote()
|
||||
c.reloadPool(c.cfgObject.pool.putRemote, newSize, "object.put.remote_pool_size")
|
||||
|
||||
newSize = replicatorconfig.PoolSize(c.appCfg)
|
||||
c.reloadPool(c.cfgObject.pool.replication, newSize, "replicator.pool_size")
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *cfg) reloadPool(p *ants.Pool, newSize int, name string) {
|
||||
oldSize := p.Cap()
|
||||
if oldSize != newSize {
|
||||
c.log.Info(logs.FrostFSNodePoolConfigurationUpdate, zap.String("field", name),
|
||||
zap.Int("old", oldSize), zap.Int("new", newSize))
|
||||
p.Tune(newSize)
|
||||
}
|
||||
}
|
||||
|
||||
func (c *cfg) reloadAppConfig() error {
|
||||
unlock := c.LockAppConfigExclusive()
|
||||
defer unlock()
|
||||
|
|
|
@ -210,7 +210,7 @@ type morphContainerReader struct {
|
|||
src containerCore.Source
|
||||
|
||||
lister interface {
|
||||
List(*user.ID) ([]cid.ID, error)
|
||||
ContainersOf(*user.ID) ([]cid.ID, error)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -226,8 +226,8 @@ func (x *morphContainerReader) GetEACL(id cid.ID) (*containerCore.EACL, error) {
|
|||
return x.eacl.GetEACL(id)
|
||||
}
|
||||
|
||||
func (x *morphContainerReader) List(id *user.ID) ([]cid.ID, error) {
|
||||
return x.lister.List(id)
|
||||
func (x *morphContainerReader) ContainersOf(id *user.ID) ([]cid.ID, error) {
|
||||
return x.lister.ContainersOf(id)
|
||||
}
|
||||
|
||||
type morphContainerWriter struct {
|
||||
|
|
|
@ -263,7 +263,6 @@ func addPolicer(c *cfg, keyStorage *util.KeyStorage, clientConstructor *cache.Cl
|
|||
)
|
||||
}
|
||||
}),
|
||||
policer.WithMaxCapacity(c.cfgObject.pool.replicatorPoolSize),
|
||||
policer.WithPool(c.cfgObject.pool.replication),
|
||||
policer.WithMetrics(c.metricsCollector.PolicerMetrics()),
|
||||
)
|
||||
|
|
|
@ -47,6 +47,7 @@
|
|||
"FROSTFS_NODE_ADDRESSES":"127.0.0.1:8080",
|
||||
"FROSTFS_GRPC_0_ENDPOINT":"127.0.0.1:8080",
|
||||
"FROSTFS_CONTROL_GRPC_ENDPOINT":"127.0.0.1:8081",
|
||||
"FROSTFS_CONTROL_AUTHORIZED_KEYS":"031a6c6fbbdf02ca351745fa86b9ba5a9452d785ac4f7fc2b7548ca2a46c4fcf4a",
|
||||
"FROSTFS_NODE_ATTRIBUTE_0":"User-Agent:FrostFS/dev",
|
||||
"FROSTFS_NODE_ATTRIBUTE_1":"UN-LOCODE:RU MOW",
|
||||
"FROSTFS_NODE_PERSISTENT_STATE_PATH":"${workspaceFolder}/.cache/.frostfs-node-state",
|
||||
|
|
|
@ -46,3 +46,9 @@ Shard can automatically switch to a `degraded-read-only` mode in 3 cases:
|
|||
1. If the metabase was not available or couldn't be opened/initialized during shard startup.
|
||||
2. If shard error counter exceeds threshold.
|
||||
3. If the metabase couldn't be reopened during SIGHUP handling.
|
||||
|
||||
# Detach shard
|
||||
|
||||
To detach a shard use `frostfs-cli control shards detach` command. This command removes the shards from the storage
|
||||
engine and closes all resources associated with the shards.
|
||||
Limitation: `SIGHUP` or storage node restart lead to detached shard will be again online.
|
7
go.mod
7
go.mod
|
@ -3,6 +3,7 @@ module git.frostfs.info/TrueCloudLab/frostfs-node
|
|||
go 1.20
|
||||
|
||||
require (
|
||||
code.gitea.io/sdk/gitea v0.17.1
|
||||
git.frostfs.info/TrueCloudLab/frostfs-api-go/v2 v2.16.1-0.20240112150928-72885aae835c
|
||||
git.frostfs.info/TrueCloudLab/frostfs-contract v0.18.1-0.20240115082915-f2a82aa635aa
|
||||
git.frostfs.info/TrueCloudLab/frostfs-observability v0.0.0-20231101111734-b3ad3335ff65
|
||||
|
@ -29,7 +30,7 @@ require (
|
|||
github.com/spf13/cobra v1.8.0
|
||||
github.com/spf13/pflag v1.0.5
|
||||
github.com/spf13/viper v1.18.2
|
||||
github.com/stretchr/testify v1.8.4
|
||||
github.com/stretchr/testify v1.9.0
|
||||
go.etcd.io/bbolt v1.3.8
|
||||
go.opentelemetry.io/otel v1.22.0
|
||||
go.opentelemetry.io/otel/trace v1.22.0
|
||||
|
@ -61,8 +62,10 @@ require (
|
|||
github.com/consensys/gnark-crypto v0.12.2-0.20231222162921-eb75782795d2 // indirect
|
||||
github.com/cpuguy83/go-md2man/v2 v2.0.3 // indirect
|
||||
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
|
||||
github.com/davidmz/go-pageant v1.0.2 // indirect
|
||||
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.2.0 // indirect
|
||||
github.com/fsnotify/fsnotify v1.7.0 // indirect
|
||||
github.com/go-fed/httpsig v1.1.0 // indirect
|
||||
github.com/go-logr/logr v1.4.1 // indirect
|
||||
github.com/go-logr/stdr v1.2.2 // indirect
|
||||
github.com/golang/protobuf v1.5.3 // indirect
|
||||
|
@ -71,6 +74,7 @@ require (
|
|||
github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus v1.0.0 // indirect
|
||||
github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.0.1 // indirect
|
||||
github.com/grpc-ecosystem/grpc-gateway/v2 v2.19.0 // indirect
|
||||
github.com/hashicorp/go-version v1.6.0 // indirect
|
||||
github.com/hashicorp/golang-lru v1.0.2 // indirect
|
||||
github.com/hashicorp/hcl v1.0.0 // indirect
|
||||
github.com/holiman/uint256 v1.2.4 // indirect
|
||||
|
@ -103,6 +107,7 @@ require (
|
|||
github.com/russross/blackfriday/v2 v2.1.0 // indirect
|
||||
github.com/spaolacci/murmur3 v1.1.0 // indirect
|
||||
github.com/spf13/afero v1.11.0 // indirect
|
||||
github.com/stretchr/objx v0.5.2 // indirect
|
||||
github.com/subosito/gotenv v1.6.0 // indirect
|
||||
github.com/syndtr/goleveldb v1.0.1-0.20220721030215-126854af5e6d // indirect
|
||||
github.com/twmb/murmur3 v1.1.8 // indirect
|
||||
|
|
BIN
go.sum
BIN
go.sum
Binary file not shown.
|
@ -451,6 +451,7 @@ const (
|
|||
FrostFSNodeLoggerConfigurationPreparation = "logger configuration preparation"
|
||||
FrostFSNodeTracingConfigationUpdated = "tracing configation updated"
|
||||
FrostFSNodeStorageEngineConfigurationUpdate = "storage engine configuration update"
|
||||
FrostFSNodePoolConfigurationUpdate = "adjust pool configuration"
|
||||
FrostFSNodeUpdatedConfigurationApplying = "updated configuration applying"
|
||||
FrostFSNodeConfigurationHasBeenReloadedSuccessfully = "configuration has been reloaded successfully"
|
||||
FrostFSNodeReadNewlyCreatedContainerAfterTheNotification = "read newly created container after the notification"
|
||||
|
|
|
@ -2,7 +2,9 @@ package engine
|
|||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||
|
@ -14,6 +16,7 @@ import (
|
|||
"github.com/google/uuid"
|
||||
"github.com/panjf2000/ants/v2"
|
||||
"go.uber.org/zap"
|
||||
"golang.org/x/sync/errgroup"
|
||||
)
|
||||
|
||||
var errShardNotFound = logicerr.New("shard not found")
|
||||
|
@ -344,6 +347,100 @@ func (e *StorageEngine) HandleNewEpoch(ctx context.Context, epoch uint64) {
|
|||
}
|
||||
}
|
||||
|
||||
func (e *StorageEngine) DetachShards(ids []*shard.ID) error {
|
||||
if len(ids) == 0 {
|
||||
return logicerr.New("ids must be non-empty")
|
||||
}
|
||||
|
||||
deletedShards, err := e.deleteShards(ids)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return e.closeShards(deletedShards)
|
||||
}
|
||||
|
||||
// closeShards closes deleted shards. Tries to close all shards.
|
||||
// Returns single error with joined shard errors.
|
||||
func (e *StorageEngine) closeShards(deletedShards []hashedShard) error {
|
||||
var multiErr error
|
||||
var multiErrGuard sync.Mutex
|
||||
var eg errgroup.Group
|
||||
for _, sh := range deletedShards {
|
||||
sh := sh
|
||||
eg.Go(func() error {
|
||||
err := sh.SetMode(mode.Disabled)
|
||||
if err != nil {
|
||||
e.log.Error(logs.EngineCouldNotChangeShardModeToDisabled,
|
||||
zap.Stringer("id", sh.ID()),
|
||||
zap.Error(err),
|
||||
)
|
||||
multiErrGuard.Lock()
|
||||
multiErr = errors.Join(multiErr, fmt.Errorf("could not change shard (id:%s) mode to disabled: %w", sh.ID(), err))
|
||||
multiErrGuard.Unlock()
|
||||
}
|
||||
|
||||
err = sh.Close()
|
||||
if err != nil {
|
||||
e.log.Error(logs.EngineCouldNotCloseRemovedShard,
|
||||
zap.Stringer("id", sh.ID()),
|
||||
zap.Error(err),
|
||||
)
|
||||
multiErrGuard.Lock()
|
||||
multiErr = errors.Join(multiErr, fmt.Errorf("could not close removed shard (id:%s): %w", sh.ID(), err))
|
||||
multiErrGuard.Unlock()
|
||||
}
|
||||
return nil
|
||||
})
|
||||
}
|
||||
if err := eg.Wait(); err != nil {
|
||||
return err
|
||||
}
|
||||
return multiErr
|
||||
}
|
||||
|
||||
// deleteShards deletes shards with specified ids from engine shard list
|
||||
// and releases all engine resources associated with shards.
|
||||
// Returns deleted shards or error if some shard could not be deleted.
|
||||
func (e *StorageEngine) deleteShards(ids []*shard.ID) ([]hashedShard, error) {
|
||||
ss := make([]hashedShard, 0, len(ids))
|
||||
|
||||
e.mtx.Lock()
|
||||
defer e.mtx.Unlock()
|
||||
|
||||
for _, id := range ids {
|
||||
idStr := id.String()
|
||||
sh, found := e.shards[idStr]
|
||||
if !found {
|
||||
return nil, errShardNotFound
|
||||
}
|
||||
ss = append(ss, sh)
|
||||
}
|
||||
|
||||
if len(ss) == len(e.shards) {
|
||||
return nil, logicerr.New("could not delete all the shards")
|
||||
}
|
||||
|
||||
for _, sh := range ss {
|
||||
idStr := sh.ID().String()
|
||||
|
||||
sh.DeleteShardMetrics()
|
||||
|
||||
delete(e.shards, idStr)
|
||||
|
||||
pool, ok := e.shardPools[idStr]
|
||||
if ok {
|
||||
pool.Release()
|
||||
delete(e.shardPools, idStr)
|
||||
}
|
||||
|
||||
e.log.Info(logs.EngineShardHasBeenRemoved,
|
||||
zap.String("id", idStr))
|
||||
}
|
||||
|
||||
return ss, nil
|
||||
}
|
||||
|
||||
func (s hashedShard) Hash() uint64 {
|
||||
return s.hash
|
||||
}
|
||||
|
|
|
@ -4,6 +4,8 @@ import (
|
|||
"context"
|
||||
"testing"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
|
@ -42,3 +44,21 @@ func TestRemoveShard(t *testing.T) {
|
|||
require.True(t, ok != removed)
|
||||
}
|
||||
}
|
||||
|
||||
func TestDisableShards(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
const numOfShards = 2
|
||||
|
||||
te := testNewEngine(t).setShardsNum(t, numOfShards)
|
||||
e, ids := te.engine, te.shardIDs
|
||||
defer func() { require.NoError(t, e.Close(context.Background())) }()
|
||||
|
||||
require.ErrorAs(t, e.DetachShards(ids), new(logicerr.Logical))
|
||||
require.ErrorAs(t, e.DetachShards(nil), new(logicerr.Logical))
|
||||
require.ErrorAs(t, e.DetachShards([]*shard.ID{}), new(logicerr.Logical))
|
||||
|
||||
require.NoError(t, e.DetachShards([]*shard.ID{ids[0]}))
|
||||
|
||||
require.Equal(t, 1, len(e.shards))
|
||||
}
|
||||
|
|
|
@ -254,9 +254,18 @@ func (db *DB) delete(tx *bbolt.Tx, addr oid.Address, refCounter referenceCounter
|
|||
// unmarshal object, work only with physically stored (raw == true) objects
|
||||
obj, err := db.get(tx, addr, key, false, true, currEpoch)
|
||||
if err != nil {
|
||||
if client.IsErrObjectNotFound(err) {
|
||||
addrKey = addressKey(addr, key)
|
||||
if garbageBKT != nil {
|
||||
err := garbageBKT.Delete(addrKey)
|
||||
if err != nil {
|
||||
return deleteSingleResult{}, fmt.Errorf("could not remove from garbage bucket: %w", err)
|
||||
}
|
||||
}
|
||||
return deleteSingleResult{}, nil
|
||||
}
|
||||
var siErr *objectSDK.SplitInfoError
|
||||
|
||||
if client.IsErrObjectNotFound(err) || errors.As(err, &siErr) {
|
||||
if errors.As(err, &siErr) {
|
||||
// if object is virtual (parent) then do nothing, it will be deleted with last child
|
||||
return deleteSingleResult{}, nil
|
||||
}
|
||||
|
|
|
@ -182,6 +182,37 @@ func TestDelete(t *testing.T) {
|
|||
require.Equal(t, 0, len(addrs))
|
||||
}
|
||||
|
||||
func TestDeleteDropsGCMarkIfObjectNotFound(t *testing.T) {
|
||||
db := newDB(t, meta.WithEpochState(epochState{currEpoch}))
|
||||
defer func() { require.NoError(t, db.Close()) }()
|
||||
|
||||
addr := oidtest.Address()
|
||||
|
||||
var prm meta.InhumePrm
|
||||
prm.SetAddresses(addr)
|
||||
prm.SetGCMark()
|
||||
_, err := db.Inhume(context.Background(), prm)
|
||||
require.NoError(t, err)
|
||||
|
||||
var garbageCount int
|
||||
var itPrm meta.GarbageIterationPrm
|
||||
itPrm.SetHandler(func(g meta.GarbageObject) error {
|
||||
garbageCount++
|
||||
return nil
|
||||
})
|
||||
require.NoError(t, db.IterateOverGarbage(context.Background(), itPrm))
|
||||
require.Equal(t, 1, garbageCount)
|
||||
|
||||
var delPrm meta.DeletePrm
|
||||
delPrm.SetAddresses(addr)
|
||||
_, err = db.Delete(context.Background(), delPrm)
|
||||
require.NoError(t, err)
|
||||
|
||||
garbageCount = 0
|
||||
require.NoError(t, db.IterateOverGarbage(context.Background(), itPrm))
|
||||
require.Equal(t, 0, garbageCount)
|
||||
}
|
||||
|
||||
func metaDelete(db *meta.DB, addrs ...oid.Address) error {
|
||||
var deletePrm meta.DeletePrm
|
||||
deletePrm.SetAddresses(addrs...)
|
||||
|
|
|
@ -53,7 +53,7 @@ func (c *Client) ContainersOf(idUser *user.ID) ([]cid.ID, error) {
|
|||
err := c.client.Morph().TestInvokeIterator(cb, batchSize, cnrHash, containersOfMethod, rawID)
|
||||
if err != nil {
|
||||
if errors.Is(err, unwrap.ErrNoSessionID) {
|
||||
return c.List(idUser)
|
||||
return c.list(idUser)
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
|
|
|
@ -8,13 +8,13 @@ import (
|
|||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
|
||||
)
|
||||
|
||||
// List returns a list of container identifiers belonging
|
||||
// list returns a list of container identifiers belonging
|
||||
// to the specified user of FrostFS system. The list is composed
|
||||
// through Container contract call.
|
||||
//
|
||||
// Returns the identifiers of all FrostFS containers if pointer
|
||||
// to user identifier is nil.
|
||||
func (c *Client) List(idUser *user.ID) ([]cid.ID, error) {
|
||||
func (c *Client) list(idUser *user.ID) ([]cid.ID, error) {
|
||||
var rawID []byte
|
||||
|
||||
if idUser != nil {
|
||||
|
|
|
@ -26,10 +26,10 @@ type Reader interface {
|
|||
containercore.Source
|
||||
containercore.EACLSource
|
||||
|
||||
// List returns a list of container identifiers belonging
|
||||
// ContainersOf returns a list of container identifiers belonging
|
||||
// to the specified user of FrostFS system. Returns the identifiers
|
||||
// of all FrostFS containers if pointer to owner identifier is nil.
|
||||
List(*user.ID) ([]cid.ID, error)
|
||||
ContainersOf(*user.ID) ([]cid.ID, error)
|
||||
}
|
||||
|
||||
// Writer is an interface of container storage updater.
|
||||
|
@ -187,7 +187,7 @@ func (s *morphExecutor) List(_ context.Context, body *container.ListRequestBody)
|
|||
return nil, fmt.Errorf("invalid user ID: %w", err)
|
||||
}
|
||||
|
||||
cnrs, err := s.rdr.List(&id)
|
||||
cnrs, err := s.rdr.ContainersOf(&id)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
|
@ -26,6 +26,7 @@ const (
|
|||
rpcRemoveChainLocalOverride = "RemoveChainLocalOverride"
|
||||
rpcSealWriteCache = "SealWriteCache"
|
||||
rpcListTargetsLocalOverrides = "ListTargetsLocalOverrides"
|
||||
rpcDetachShards = "DetachShards"
|
||||
)
|
||||
|
||||
// HealthCheck executes ControlService.HealthCheck RPC.
|
||||
|
@ -292,3 +293,22 @@ func SealWriteCache(cli *client.Client, req *SealWriteCacheRequest, opts ...clie
|
|||
|
||||
return wResp.message, nil
|
||||
}
|
||||
|
||||
// DetachShards executes ControlService.DetachShards RPC.
|
||||
func DetachShards(
|
||||
cli *client.Client,
|
||||
req *DetachShardsRequest,
|
||||
opts ...client.CallOption,
|
||||
) (*DetachShardsResponse, error) {
|
||||
wResp := newResponseWrapper[DetachShardsResponse]()
|
||||
|
||||
wReq := &requestWrapper{
|
||||
m: req,
|
||||
}
|
||||
err := client.SendUnary(cli, common.CallMethodInfoUnary(serviceName, rpcDetachShards), wReq, wResp, opts...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return wResp.message, nil
|
||||
}
|
||||
|
|
37
pkg/services/control/server/detach_shards.go
Normal file
37
pkg/services/control/server/detach_shards.go
Normal file
|
@ -0,0 +1,37 @@
|
|||
package control
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/control"
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/status"
|
||||
)
|
||||
|
||||
func (s *Server) DetachShards(_ context.Context, req *control.DetachShardsRequest) (*control.DetachShardsResponse, error) {
|
||||
err := s.isValidRequest(req)
|
||||
if err != nil {
|
||||
return nil, status.Error(codes.PermissionDenied, err.Error())
|
||||
}
|
||||
|
||||
shardIDs := s.getShardIDList(req.GetBody().GetShard_ID())
|
||||
|
||||
if err := s.s.DetachShards(shardIDs); err != nil {
|
||||
if errors.As(err, new(logicerr.Logical)) {
|
||||
return nil, status.Error(codes.InvalidArgument, err.Error())
|
||||
}
|
||||
return nil, status.Error(codes.Internal, err.Error())
|
||||
}
|
||||
|
||||
resp := &control.DetachShardsResponse{
|
||||
Body: &control.DetachShardsResponse_Body{},
|
||||
}
|
||||
|
||||
if err = SignMessage(s.key, resp); err != nil {
|
||||
return nil, status.Error(codes.Internal, err.Error())
|
||||
}
|
||||
|
||||
return resp, nil
|
||||
}
|
BIN
pkg/services/control/service.pb.go
generated
BIN
pkg/services/control/service.pb.go
generated
Binary file not shown.
|
@ -62,6 +62,9 @@ service ControlService {
|
|||
|
||||
// Flush objects from write-cache and move it to degraded read only mode.
|
||||
rpc SealWriteCache(SealWriteCacheRequest) returns (SealWriteCacheResponse);
|
||||
|
||||
// DetachShards detaches and closes shards.
|
||||
rpc DetachShards(DetachShardsRequest) returns (DetachShardsResponse);
|
||||
}
|
||||
|
||||
// Health check request.
|
||||
|
@ -584,3 +587,21 @@ message SealWriteCacheResponse {
|
|||
|
||||
Signature signature = 2;
|
||||
}
|
||||
|
||||
message DetachShardsRequest {
|
||||
message Body {
|
||||
repeated bytes shard_ID = 1;
|
||||
}
|
||||
|
||||
Body body = 1;
|
||||
Signature signature = 2;
|
||||
}
|
||||
|
||||
message DetachShardsResponse {
|
||||
message Body {
|
||||
}
|
||||
|
||||
Body body = 1;
|
||||
|
||||
Signature signature = 2;
|
||||
}
|
||||
|
|
BIN
pkg/services/control/service_frostfs.pb.go
generated
BIN
pkg/services/control/service_frostfs.pb.go
generated
Binary file not shown.
BIN
pkg/services/control/service_grpc.pb.go
generated
BIN
pkg/services/control/service_grpc.pb.go
generated
Binary file not shown.
|
@ -64,8 +64,6 @@ type cfg struct {
|
|||
|
||||
taskPool *ants.Pool
|
||||
|
||||
maxCapacity int
|
||||
|
||||
batchSize, cacheSize uint32
|
||||
|
||||
rebalanceFreq, evictDuration, sleepDuration time.Duration
|
||||
|
@ -158,14 +156,6 @@ func WithRedundantCopyCallback(cb RedundantCopyCallback) Option {
|
|||
}
|
||||
}
|
||||
|
||||
// WithMaxCapacity returns option to set max capacity
|
||||
// that can be set to the pool.
|
||||
func WithMaxCapacity(capacity int) Option {
|
||||
return func(c *cfg) {
|
||||
c.maxCapacity = capacity
|
||||
}
|
||||
}
|
||||
|
||||
// WithPool returns option to set pool for
|
||||
// policy and replication operations.
|
||||
func WithPool(p *ants.Pool) Option {
|
||||
|
|
|
@ -66,7 +66,7 @@ func New(opts ...Option) *Policer {
|
|||
cfg: c,
|
||||
cache: cache,
|
||||
objsInWork: &objectsInWork{
|
||||
objs: make(map[oid.Address]struct{}, c.maxCapacity),
|
||||
objs: make(map[oid.Address]struct{}, c.taskPool.Cap()),
|
||||
},
|
||||
}
|
||||
}
|
||||
|
|
|
@ -48,16 +48,12 @@ func TestBuryObjectWithoutContainer(t *testing.T) {
|
|||
return nil
|
||||
}
|
||||
|
||||
// Task pool
|
||||
pool, err := ants.NewPool(4)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Policer instance
|
||||
p := New(
|
||||
WithKeySpaceIterator(&sliceKeySpaceIterator{objs: objs}),
|
||||
WithContainerSource(containerSrc),
|
||||
WithBuryFunc(buryFn),
|
||||
WithPool(pool),
|
||||
WithPool(testPool(t)),
|
||||
)
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
|
@ -239,6 +235,7 @@ func TestProcessObject(t *testing.T) {
|
|||
gotReplicateTo = append(gotReplicateTo, int(node.PublicKey()[0]))
|
||||
}
|
||||
})),
|
||||
WithPool(testPool(t)),
|
||||
)
|
||||
|
||||
addrWithType := objectcore.AddressWithType{
|
||||
|
@ -276,6 +273,7 @@ func TestProcessObjectError(t *testing.T) {
|
|||
p := New(
|
||||
WithContainerSource(source),
|
||||
WithBuryFunc(buryFn),
|
||||
WithPool(testPool(t)),
|
||||
)
|
||||
|
||||
addrWithType := objectcore.AddressWithType{
|
||||
|
@ -296,9 +294,6 @@ func TestIteratorContract(t *testing.T) {
|
|||
return nil
|
||||
}
|
||||
|
||||
pool, err := ants.NewPool(4)
|
||||
require.NoError(t, err)
|
||||
|
||||
it := &predefinedIterator{
|
||||
scenario: []nextResult{
|
||||
{objs, nil},
|
||||
|
@ -324,7 +319,7 @@ func TestIteratorContract(t *testing.T) {
|
|||
WithKeySpaceIterator(it),
|
||||
WithContainerSource(containerSrc),
|
||||
WithBuryFunc(buryFn),
|
||||
WithPool(pool),
|
||||
WithPool(testPool(t)),
|
||||
func(c *cfg) {
|
||||
c.sleepDuration = time.Millisecond
|
||||
},
|
||||
|
@ -348,6 +343,12 @@ func TestIteratorContract(t *testing.T) {
|
|||
}, it.calls)
|
||||
}
|
||||
|
||||
func testPool(t *testing.T) *ants.Pool {
|
||||
pool, err := ants.NewPool(4)
|
||||
require.NoError(t, err)
|
||||
return pool
|
||||
}
|
||||
|
||||
type nextResult struct {
|
||||
objs []objectcore.AddressWithType
|
||||
err error
|
||||
|
|
|
@ -3,10 +3,12 @@ package policer
|
|||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine"
|
||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
|
@ -34,6 +36,7 @@ func (p *Policer) shardPolicyWorker(ctx context.Context) {
|
|||
p.log.Warn(logs.PolicerFailureAtObjectSelectForReplication, zap.Error(err))
|
||||
}
|
||||
|
||||
skipMap := newSkipMap()
|
||||
for i := range addrs {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
|
@ -55,7 +58,7 @@ func (p *Policer) shardPolicyWorker(ctx context.Context) {
|
|||
|
||||
if p.objsInWork.add(addr.Address) {
|
||||
err := p.processObject(ctx, addr)
|
||||
if err != nil {
|
||||
if err != nil && !skipMap.addSeenError(addr.Address.Container(), err) {
|
||||
p.log.Error(logs.PolicerUnableToProcessObj,
|
||||
zap.Stringer("object", addr.Address),
|
||||
zap.String("error", err.Error()))
|
||||
|
@ -72,3 +75,36 @@ func (p *Policer) shardPolicyWorker(ctx context.Context) {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
type errMap struct {
|
||||
sync.Mutex
|
||||
skipMap map[cid.ID][]error
|
||||
}
|
||||
|
||||
func newSkipMap() *errMap {
|
||||
return &errMap{
|
||||
skipMap: make(map[cid.ID][]error),
|
||||
}
|
||||
}
|
||||
|
||||
// addSeenError marks err as seen error for the container.
|
||||
// Returns true is the error has already been added.
|
||||
func (m *errMap) addSeenError(cnr cid.ID, err error) bool {
|
||||
m.Lock()
|
||||
defer m.Unlock()
|
||||
|
||||
for _, e := range m.skipMap[cnr] {
|
||||
if errors.Is(err, e) {
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
||||
// Restrict list length to avoid possible OOM if some random error is added in future.
|
||||
const maxErrListLength = 10
|
||||
|
||||
lst := m.skipMap[cnr]
|
||||
if len(lst) < maxErrListLength {
|
||||
m.skipMap[cnr] = append(lst, err)
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue