Compare commits

..

17 commits

Author SHA1 Message Date
0530f04670 [#466] adm: Allow to download contracts from Gitea, with tests
Signed-off-by: Olga Konstantinova <kola43843@gmail.com>
2024-06-25 22:19:09 +03:00
80b581d499 [#466] adm: Allow to download contracts from Gitea
Signed-off-by: Olga Konstantinova <kola43843@gmail.com>
2024-02-08 21:07:49 +00:00
805862f4b7 [#956] node: Allow to reload goroutine pool sizes
Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2024-02-08 15:49:43 +00:00
426cf58b98 [#956] node: Remove pool sizes from config struct
They are available through the pool methods and unused outside of the
function that sets them.

Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2024-02-08 15:49:43 +00:00
edbe06e07e [#956] policer/test: Reuse testPool helper
Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2024-02-08 15:49:43 +00:00
cbfeb72466 [#956] policer: Remove WithMaxCapacity option
We already provide the pool and this argument is used only for
preallocation. No functional changes.

Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2024-02-08 15:49:43 +00:00
053a195ac2 [#968] adm: Allow concurrent epoch ticks
Previous fix was incomplete, there are two possible places for this
error to occur.

Refs #592

Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2024-02-08 08:10:24 +00:00
cfc5ce7853 [#964] metabase: Drop GC marks if object not found
GC inhumes expired locks and tombstones on all the shards.
So it could be GC mark without object.

Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
2024-02-08 07:54:39 +00:00
c3fa902780 [#969] policer: Restrict the number of remembered errors
Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2024-02-08 10:10:41 +03:00
6010dfdf3d [#969] policer: Make error skip thread-safe
Introduces in afd2ba9a66.
Refs #914

Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2024-02-08 10:10:41 +03:00
a6c9a337cd [#965] morph: Get rid of container.List invocations
ContainersOf() is better in almost every aspect, besides creating a
session when the containers number is between 1024 and 2048 (prefetch
script does limited unwrapping). Making List() private helps to ensure
it is no longer used and can be safely removed in future.

Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2024-02-07 08:56:27 +00:00
b1a1b2107d [#909] cli: Make add-rule and list-rules recieve namespace param
Signed-off-by: Airat Arifullin <a.arifullin@yadro.com>
2024-02-07 06:54:41 +00:00
d7838790c6 [#917] dev: Extend launch.json example
Add storage and dev wallets to control authorized keys list.

Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
2024-02-06 14:49:47 +03:00
20b4447df7 [#917] docs: Extend shard mode description
Add shards detach details.

Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
2024-02-06 14:49:47 +03:00
9ba48c582d [#917] engine: Allow to detach shards
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
2024-02-06 14:49:47 +03:00
4358d3c423 [#917] controlSvc: Add DetachShards handler
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
2024-02-06 14:47:52 +03:00
afd2ba9a66 [#110] Add check for repeated error log in policer
processObject() returns 3 types of errors: container not found errors,
could not get container error and placement vector building error. Every
error will occur for all objects in container simultaneously, so we can
log each error once and safely ignore the rest.

Signed-off-by: Ekaterina Lebedeva <ekaterina.lebedeva@yadro.com>
2024-02-06 00:56:41 +03:00
38 changed files with 791 additions and 125 deletions

View file

@ -0,0 +1,110 @@
package morph
import (
"context"
"errors"
"fmt"
"io"
"net"
"net/http"
"strings"
"time"
"code.gitea.io/sdk/gitea"
"github.com/spf13/cobra"
)
func downloadContracts(cmd *cobra.Command, url string) (io.ReadCloser, error) {
cmd.Printf("Downloading contracts archive from '%s'\n", url)
// HTTP client with connect timeout
client := http.Client{
Transport: &http.Transport{
DialContext: (&net.Dialer{
Timeout: 10 * time.Second,
}).DialContext,
},
}
ctx, cancel := context.WithTimeout(cmd.Context(), 60*time.Second)
defer cancel()
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
if err != nil {
return nil, fmt.Errorf("can't create request: %w", err)
}
resp, err := client.Do(req)
if err != nil {
return nil, fmt.Errorf("can't fetch contracts archive: %w", err)
}
return resp.Body, nil
}
type GiteaGlient interface {
ListReleases(owner, repo string, opt gitea.ListReleasesOptions) ([]*gitea.Release, *gitea.Response, error)
}
type GiteaRepo struct {
client GiteaGlient
}
func New() GiteaRepo {
client, _ := gitea.NewClient("https://git.frostfs.info")
return GiteaRepo{
client: client,
}
}
func (g *GiteaRepo) ListReleases(owner, repo string, opt gitea.ListReleasesOptions) ([]*gitea.Release, *gitea.Response, error) {
return g.client.ListReleases(owner, repo, opt)
}
type Downloader interface {
DownloadContracts(cmd *cobra.Command, url string) (io.ReadCloser, error)
}
type ContractDownloader struct{}
func (d *ContractDownloader) DownloadContracts(cmd *cobra.Command, url string) (io.ReadCloser, error) {
return downloadContracts(cmd, url)
}
func downloadContractsFromRepository(cmd *cobra.Command) (io.ReadCloser, error) {
giteaRepo := new(GiteaRepo)
downloader := new(ContractDownloader)
return downloadContractsFromGitea(cmd, giteaRepo, downloader)
}
func downloadContractsFromGitea(cmd *cobra.Command, giteaRepo *GiteaRepo, downloader Downloader) (io.ReadCloser, error) {
releases, _, err := giteaRepo.ListReleases("TrueCloudLab", "frostfs-contract", gitea.ListReleasesOptions{})
if err != nil {
return nil, fmt.Errorf("can't fetch release information: %w", err)
}
var latestRelease *gitea.Release
for _, r := range releases {
if !r.IsDraft && !r.IsPrerelease {
latestRelease = r
break
}
}
if latestRelease == nil {
return nil, fmt.Errorf("attempt to fetch contracts archive from the official repository failed: no releases found")
}
var url string
for _, a := range latestRelease.Attachments {
if strings.HasPrefix(a.Name, "frostfs-contract") {
url = a.DownloadURL
break
}
}
if url == "" {
return nil, errors.New("can't find contracts archive in the latest release")
}
return downloader.DownloadContracts(cmd, url)
}

View file

@ -0,0 +1,186 @@
package morph
import (
"bytes"
"code.gitea.io/sdk/gitea"
"fmt"
"github.com/spf13/cobra"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"io"
"testing"
)
type MockGiteaRepo struct {
mock.Mock
*gitea.Client
}
func (m *MockGiteaRepo) ListReleases(owner, repo string, opt gitea.ListReleasesOptions) ([]*gitea.Release, *gitea.Response, error) {
args := m.Called(owner, repo, opt)
return args.Get(0).([]*gitea.Release), args.Get(1).(*gitea.Response), args.Error(2)
}
type MockDownloader struct {
mock.Mock
}
func (m *MockDownloader) DownloadContracts(cmd *cobra.Command, url string) (io.ReadCloser, error) {
args := m.Called(cmd, url)
return args.Get(0).(io.ReadCloser), args.Error(1)
}
func TestGiteaRepo_DownloadContracts_ListReleasesFailure(t *testing.T) {
m := MockGiteaRepo{}
giteaRepo := GiteaRepo{
client: &m,
}
m.On("ListReleases", "TrueCloudLab", "frostfs-contract", gitea.ListReleasesOptions{}).
Return([]*gitea.Release{}, &gitea.Response{Response: nil}, fmt.Errorf("failed"))
res, err := downloadContractsFromGitea(nil, &giteaRepo, nil)
assert.Nil(t, res, "result must be nil")
require.Error(t, err, "error must be provided")
assert.EqualError(t, err, "can't fetch release information: failed")
}
func TestGiteaRepo_DownloadContracts_ListReleasesEmpty(t *testing.T) {
m := MockGiteaRepo{}
giteaRepo := GiteaRepo{
client: &m,
}
m.On("ListReleases", "TrueCloudLab", "frostfs-contract", gitea.ListReleasesOptions{}).
Return([]*gitea.Release{}, &gitea.Response{Response: nil}, nil)
res, err := downloadContractsFromGitea(nil, &giteaRepo, nil)
assert.Nil(t, res, "result must be nil")
require.Error(t, err, "error must be provided")
assert.EqualError(t, err, "attempt to fetch contracts archive from the official repository failed: no releases found")
}
func TestGiteaRepo_DownloadContracts_ListReleasesOnlyDraftsAndPrereleases(t *testing.T) {
mockRepo := MockGiteaRepo{}
giteaRepo := GiteaRepo{
client: &mockRepo,
}
mockDownloader := MockDownloader{}
attachment := gitea.Attachment{
ID: 1,
DownloadURL: "https://git.frostfs.info/TrueCloudLab/frostfs-contract/releases/download/v0.19.2/frostfs-contract-v0.19.2.tar.gz",
Name: "frostfs-contract",
}
draft := gitea.Release{
ID: 1,
Title: "Draft",
URL: "https://git.frostfs.info/TrueCloudLab/frostfs-contract/releases/tag/v0.19.2",
IsDraft: true,
IsPrerelease: false,
Attachments: []*gitea.Attachment{&attachment},
}
prerelease := gitea.Release{
ID: 2,
Title: "Prerelease",
URL: "https://git.frostfs.info/TrueCloudLab/frostfs-contract/releases/tag/v0.20.1",
IsDraft: false,
IsPrerelease: true,
}
cmd := cobra.Command{}
mockRepo.On("ListReleases", "TrueCloudLab", "frostfs-contract", gitea.ListReleasesOptions{}).
Return([]*gitea.Release{&draft, &prerelease}, &gitea.Response{Response: nil}, nil)
mockDownloader.On("DownloadContracts", &cmd, attachment.DownloadURL).
Return(io.NopCloser(bytes.NewReader([]byte("data"))), nil)
res, err := downloadContractsFromGitea(&cmd, &giteaRepo, &mockDownloader)
assert.Nil(t, res, "result must be nil")
require.Error(t, err, "error must be provided")
assert.EqualError(t, err, "attempt to fetch contracts archive from the offitial repository failed: no releases found")
}
func TestGiteaRepo_DownloadContracts_ListReleasesWrongAttachments(t *testing.T) {
mockRepo := MockGiteaRepo{}
giteaRepo := GiteaRepo{
client: &mockRepo,
}
attachment1 := gitea.Attachment{
ID: 1,
DownloadURL: "https://git.frostfs.info/TrueCloudLab/frostfs-contract/releases/download/v0.19.2/incorrect-name-v0.19.2.tar.gz",
Name: "incorrect-name-1",
}
attachment2 := gitea.Attachment{
ID: 2,
DownloadURL: "https://git.frostfs.info/TrueCloudLab/frostfs-contract/releases/download/v0.19.2/incorrect-name-v0.19.3.tar.gz",
Name: "incorrect-name-2",
}
release := gitea.Release{
ID: 1,
Title: "Title",
URL: "https://git.frostfs.info/TrueCloudLab/frostfs-contract/releases/tag/v0.19.2",
IsDraft: false,
IsPrerelease: false,
Attachments: []*gitea.Attachment{&attachment1, &attachment2},
}
cmd := cobra.Command{}
mockRepo.On("ListReleases", "TrueCloudLab", "frostfs-contract", gitea.ListReleasesOptions{}).
Return([]*gitea.Release{&release}, &gitea.Response{Response: nil}, nil)
res, err := downloadContractsFromGitea(&cmd, &giteaRepo, nil)
assert.Nil(t, res, "result must be nil")
require.Error(t, err, "error must be provided")
assert.EqualError(t, err, "can't find contracts archive in the latest release")
}
func TestGiteaRepo_DownloadContracts_Success(t *testing.T) {
mockRepo := MockGiteaRepo{}
giteaRepo := GiteaRepo{
client: &mockRepo,
}
mockDownloader := MockDownloader{}
attachment1 := gitea.Attachment{
ID: 1,
DownloadURL: "https://git.frostfs.info/TrueCloudLab/frostfs-contract/releases/download/v0.19.2/incorrect-name-v0.19.2.tar.gz",
Name: "incorrect-name-1",
}
attachment2 := gitea.Attachment{
ID: 2,
DownloadURL: "https://git.frostfs.info/TrueCloudLab/frostfs-contract/releases/download/v0.19.2/frostfs-contract-v0.19.2.tar.gz",
Name: "frostfs-contract",
}
release := gitea.Release{
ID: 1,
Title: "Title",
URL: "https://git.frostfs.info/TrueCloudLab/frostfs-contract/releases/tag/v0.19.2",
IsDraft: false,
IsPrerelease: false,
Attachments: []*gitea.Attachment{&attachment1, &attachment2},
}
cmd := cobra.Command{}
mockRepo.On("ListReleases", "TrueCloudLab", "frostfs-contract", gitea.ListReleasesOptions{}).
Return([]*gitea.Release{&release}, &gitea.Response{Response: nil}, nil)
mockDownloader.On("DownloadContracts", &cmd, attachment2.DownloadURL).
Return(io.NopCloser(bytes.NewReader([]byte("data"))), nil)
res, err := downloadContractsFromGitea(&cmd, &giteaRepo, &mockDownloader)
assert.NotNil(t, res, "result must not be nil")
require.NoError(t, err, "error must not be provided")
mockDownloader.AssertCalled(t, "DownloadContracts", &cmd, attachment2.DownloadURL)
}

View file

@ -37,18 +37,14 @@ func forceNewEpochCmd(cmd *cobra.Command, _ []string) error {
return err
}
if err := wCtx.sendConsensusTx(bw.Bytes()); err != nil {
return err
if err = wCtx.sendConsensusTx(bw.Bytes()); err == nil {
err = wCtx.awaitTx()
}
if err := wCtx.awaitTx(); err != nil {
if strings.Contains(err.Error(), "invalid epoch") {
cmd.Println("Epoch has already ticked.")
return nil
}
return err
if err != nil && strings.Contains(err.Error(), "invalid epoch") {
cmd.Println("Epoch has already ticked.")
return nil
}
return nil
return err
}
func emitNewEpochCall(bw *io.BufBinWriter, wCtx *initializeContext, nmHash util.Uint160) error {

View file

@ -50,6 +50,7 @@ type initializeContext struct {
Contracts map[string]*contractState
Command *cobra.Command
ContractPath string
ContractURL string
}
var ErrTooManyAlphabetNodes = fmt.Errorf("too many alphabet nodes (maximum allowed is %d)", maxAlphabetNodes)
@ -152,6 +153,11 @@ func newInitializeContext(cmd *cobra.Command, v *viper.Viper) (*initializeContex
return nil, err
}
var ctrURL string
if needContracts {
ctrURL, _ = cmd.Flags().GetString(contractsURLFlag)
}
if err := checkNotaryEnabled(c); err != nil {
return nil, err
}
@ -176,6 +182,7 @@ func newInitializeContext(cmd *cobra.Command, v *viper.Viper) (*initializeContex
Command: cmd,
Contracts: make(map[string]*contractState),
ContractPath: ctrPath,
ContractURL: ctrURL,
}
if needContracts {

View file

@ -399,10 +399,14 @@ func (c *initializeContext) readContracts(names []string) error {
}
} else {
var r io.ReadCloser
if c.ContractPath == "" {
return errors.New("contracts flag is missing")
if c.ContractPath != "" {
r, err = os.Open(c.ContractPath)
} else if c.ContractURL != "" {
r, err = downloadContracts(c.Command, c.ContractURL)
} else {
r, err = downloadContractsFromRepository(c.Command)
}
r, err = os.Open(c.ContractPath)
if err != nil {
return fmt.Errorf("can't open contracts archive: %w", err)
}

View file

@ -17,6 +17,9 @@ const (
storageGasCLIFlag = "initial-gas"
storageGasConfigFlag = "storage.initial_gas"
contractsInitFlag = "contracts"
contractsInitFlagDesc = "Path to archive with compiled FrostFS contracts (the default is to fetch the latest release from the official repository)"
contractsURLFlag = "contracts-url"
contractsURLFlagDesc = "URL to archive with compiled FrostFS contracts"
maxObjectSizeInitFlag = "network.max_object_size"
maxObjectSizeCLIFlag = "max-object-size"
epochDurationInitFlag = "network.epoch_duration"
@ -370,8 +373,9 @@ func initUpdateContractsCmd() {
RootCmd.AddCommand(updateContractsCmd)
updateContractsCmd.Flags().String(alphabetWalletsFlag, "", alphabetWalletsFlagDesc)
updateContractsCmd.Flags().StringP(endpointFlag, endpointFlagShort, "", endpointFlagDesc)
updateContractsCmd.Flags().String(contractsInitFlag, "", "Path to archive with compiled FrostFS contracts")
_ = updateContractsCmd.MarkFlagRequired(contractsInitFlag)
updateContractsCmd.Flags().String(contractsInitFlag, "", contractsInitFlagDesc)
updateContractsCmd.Flags().String(contractsURLFlag, "", contractsURLFlagDesc)
updateContractsCmd.MarkFlagsMutuallyExclusive(contractsInitFlag, contractsURLFlag)
}
func initDumpBalancesCmd() {
@ -441,8 +445,8 @@ func initInitCmd() {
RootCmd.AddCommand(initCmd)
initCmd.Flags().String(alphabetWalletsFlag, "", alphabetWalletsFlagDesc)
initCmd.Flags().StringP(endpointFlag, endpointFlagShort, "", endpointFlagDesc)
initCmd.Flags().String(contractsInitFlag, "", "Path to archive with compiled FrostFS contracts")
_ = initCmd.MarkFlagRequired(contractsInitFlag)
initCmd.Flags().String(contractsInitFlag, "", contractsInitFlagDesc)
initCmd.Flags().String(contractsURLFlag, "", contractsURLFlagDesc)
initCmd.Flags().Uint(epochDurationCLIFlag, 240, "Amount of side chain blocks in one FrostFS epoch")
initCmd.Flags().Uint(maxObjectSizeCLIFlag, 67108864, "Max single object size in bytes")
initCmd.Flags().Bool(homomorphicHashDisabledCLIFlag, false, "Disable object homomorphic hashing")
@ -451,6 +455,7 @@ func initInitCmd() {
initCmd.Flags().Uint64(containerAliasFeeCLIFlag, 500, "Container alias fee")
initCmd.Flags().String(protoConfigPath, "", "Path to the consensus node configuration")
initCmd.Flags().String(localDumpFlag, "", "Path to the blocks dump file")
initCmd.MarkFlagsMutuallyExclusive(contractsInitFlag, contractsURLFlag)
}
func initGenerateAlphabetCmd() {

View file

@ -1,16 +1,13 @@
package control
import (
"crypto/sha256"
"encoding/hex"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/rpc/client"
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/commonflags"
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/key"
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-cli/modules/util"
commonCmd "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/internal/common"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/control"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
apechain "git.frostfs.info/TrueCloudLab/policy-engine/pkg/chain"
"github.com/spf13/cobra"
)
@ -33,6 +30,8 @@ var addRuleCmd = &cobra.Command{
func addRule(cmd *cobra.Command, _ []string) {
pk := key.Get(cmd)
target := parseTarget(cmd)
chainID, _ := cmd.Flags().GetString(chainIDFlag)
hexEncoded, _ := cmd.Flags().GetBool(chainIDHexFlag)
@ -44,13 +43,6 @@ func addRule(cmd *cobra.Command, _ []string) {
commonCmd.ExitOnErr(cmd, "can't decode chain ID as hex: %w", err)
}
var cnr cid.ID
cidStr, _ := cmd.Flags().GetString(commonflags.CIDFlag)
commonCmd.ExitOnErr(cmd, "can't decode container ID: %w", cnr.DecodeString(cidStr))
rawCID := make([]byte, sha256.Size)
cnr.Encode(rawCID)
rule, _ := cmd.Flags().GetStringArray(ruleFlag)
chain := new(apechain.Chain)
@ -63,11 +55,8 @@ func addRule(cmd *cobra.Command, _ []string) {
req := &control.AddChainLocalOverrideRequest{
Body: &control.AddChainLocalOverrideRequest_Body{
Target: &control.ChainTarget{
Type: control.ChainTarget_CONTAINER,
Name: cidStr,
},
Chain: serializedChain,
Target: target,
Chain: serializedChain,
},
}
@ -91,8 +80,10 @@ func initControlAddRuleCmd() {
initControlFlags(addRuleCmd)
ff := addRuleCmd.Flags()
ff.String(commonflags.CIDFlag, "", commonflags.CIDFlagUsage)
ff.StringArray(ruleFlag, []string{}, "Rule statement")
ff.String(chainIDFlag, "", "Assign ID to the parsed chain")
ff.String(targetNameFlag, "", targetNameDesc)
ff.String(targetTypeFlag, "", targetTypeDesc)
_ = addRuleCmd.MarkFlagRequired(targetTypeFlag)
ff.Bool(chainIDHexFlag, false, "Flag to parse chain ID as hex")
}

View file

@ -0,0 +1,49 @@
package control
import (
rawclient "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/rpc/client"
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/key"
commonCmd "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/internal/common"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/control"
"github.com/spf13/cobra"
)
var shardsDetachCmd = &cobra.Command{
Use: "detach",
Short: "Detach and close the shards",
Long: "Detach and close the shards",
Run: shardsDetach,
}
func shardsDetach(cmd *cobra.Command, _ []string) {
pk := key.Get(cmd)
req := &control.DetachShardsRequest{
Body: &control.DetachShardsRequest_Body{
Shard_ID: getShardIDListFromIDFlag(cmd, false),
},
}
signRequest(cmd, pk, req)
cli := getClient(cmd, pk)
var resp *control.DetachShardsResponse
var err error
err = cli.ExecRaw(func(client *rawclient.Client) error {
resp, err = control.DetachShards(client, req)
return err
})
commonCmd.ExitOnErr(cmd, "rpc error: %w", err)
verifyResponse(cmd, resp.GetSignature(), resp.GetBody())
cmd.Println("Shard mode update request successfully sent.")
}
func initControlShardsDetachCmd() {
initControlFlags(shardsDetachCmd)
flags := shardsDetachCmd.Flags()
flags.StringSlice(shardIDFlag, nil, "List of shard IDs in base58 encoding")
}

View file

@ -1,16 +1,13 @@
package control
import (
"crypto/sha256"
"encoding/hex"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/rpc/client"
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/commonflags"
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/key"
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-cli/modules/util"
commonCmd "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/internal/common"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/control"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
apechain "git.frostfs.info/TrueCloudLab/policy-engine/pkg/chain"
"github.com/spf13/cobra"
)
@ -25,12 +22,7 @@ var getRuleCmd = &cobra.Command{
func getRule(cmd *cobra.Command, _ []string) {
pk := key.Get(cmd)
var cnr cid.ID
cidStr, _ := cmd.Flags().GetString(commonflags.CIDFlag)
commonCmd.ExitOnErr(cmd, "can't decode container ID: %w", cnr.DecodeString(cidStr))
rawCID := make([]byte, sha256.Size)
cnr.Encode(rawCID)
target := parseTarget(cmd)
chainID, _ := cmd.Flags().GetString(chainIDFlag)
hexEncoded, _ := cmd.Flags().GetBool(chainIDHexFlag)
@ -43,10 +35,7 @@ func getRule(cmd *cobra.Command, _ []string) {
req := &control.GetChainLocalOverrideRequest{
Body: &control.GetChainLocalOverrideRequest_Body{
Target: &control.ChainTarget{
Name: cidStr,
Type: control.ChainTarget_CONTAINER,
},
Target: target,
ChainId: []byte(chainID),
},
}
@ -74,7 +63,9 @@ func initControGetRuleCmd() {
initControlFlags(getRuleCmd)
ff := getRuleCmd.Flags()
ff.String(commonflags.CIDFlag, "", commonflags.CIDFlagUsage)
ff.String(targetNameFlag, "", targetNameDesc)
ff.String(targetTypeFlag, "", targetTypeDesc)
_ = getRuleCmd.MarkFlagRequired(targetTypeFlag)
ff.String(chainIDFlag, "", "Chain id")
ff.Bool(chainIDHexFlag, false, "Flag to parse chain ID as hex")
}

View file

@ -1,16 +1,17 @@
package control
import (
"crypto/sha256"
"fmt"
"strings"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/rpc/client"
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/commonflags"
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/key"
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-cli/modules/util"
commonCmd "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/internal/common"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/control"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
apechain "git.frostfs.info/TrueCloudLab/policy-engine/pkg/chain"
"github.com/nspcc-dev/neo-go/cli/input"
"github.com/spf13/cobra"
)
@ -21,22 +22,56 @@ var listRulesCmd = &cobra.Command{
Run: listRules,
}
const (
defaultNamespace = "root"
namespaceTarget = "namespace"
containerTarget = "container"
)
const (
targetNameFlag = "target-name"
targetNameDesc = "Resource name in APE resource name format"
targetTypeFlag = "target-type"
targetTypeDesc = "Resource type(container/namespace)"
)
func parseTarget(cmd *cobra.Command) *control.ChainTarget {
typ, _ := cmd.Flags().GetString(targetTypeFlag)
name, _ := cmd.Flags().GetString(targetNameFlag)
switch typ {
case namespaceTarget:
if name == "" {
ln, err := input.ReadLine(fmt.Sprintf("Target name is not set. Confirm to use %s namespace (n|Y)> ", defaultNamespace))
commonCmd.ExitOnErr(cmd, "read line error: %w", err)
ln = strings.ToLower(ln)
if len(ln) > 0 && (ln[0] == 'n') {
commonCmd.ExitOnErr(cmd, "read namespace error: %w", fmt.Errorf("setting default value was declined"))
}
name = defaultNamespace
}
return &control.ChainTarget{
Name: name,
Type: control.ChainTarget_NAMESPACE,
}
case containerTarget:
var cnr cid.ID
commonCmd.ExitOnErr(cmd, "can't decode container ID: %w", cnr.DecodeString(name))
return &control.ChainTarget{
Name: name,
Type: control.ChainTarget_CONTAINER,
}
default:
commonCmd.ExitOnErr(cmd, "read target type error: %w", fmt.Errorf("unknown target type"))
}
return nil
}
func listRules(cmd *cobra.Command, _ []string) {
pk := key.Get(cmd)
var cnr cid.ID
cidStr, _ := cmd.Flags().GetString(commonflags.CIDFlag)
commonCmd.ExitOnErr(cmd, "can't decode container ID: %w", cnr.DecodeString(cidStr))
rawCID := make([]byte, sha256.Size)
cnr.Encode(rawCID)
req := &control.ListChainLocalOverridesRequest{
Body: &control.ListChainLocalOverridesRequest_Body{
Target: &control.ChainTarget{
Name: cidStr,
Type: control.ChainTarget_CONTAINER,
},
Target: parseTarget(cmd),
},
}
@ -71,5 +106,7 @@ func initControlListRulesCmd() {
initControlFlags(listRulesCmd)
ff := listRulesCmd.Flags()
ff.String(commonflags.CIDFlag, "", commonflags.CIDFlagUsage)
ff.String(targetNameFlag, "", targetNameDesc)
ff.String(targetTypeFlag, "", targetTypeDesc)
_ = listRulesCmd.MarkFlagRequired(targetTypeFlag)
}

View file

@ -1,15 +1,12 @@
package control
import (
"crypto/sha256"
"encoding/hex"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/rpc/client"
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/commonflags"
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/key"
commonCmd "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/internal/common"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/control"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
"github.com/spf13/cobra"
)
@ -28,13 +25,6 @@ var removeRuleCmd = &cobra.Command{
func removeRule(cmd *cobra.Command, _ []string) {
pk := key.Get(cmd)
var cnr cid.ID
cidStr, _ := cmd.Flags().GetString(commonflags.CIDFlag)
commonCmd.ExitOnErr(cmd, "can't decode container ID: %w", cnr.DecodeString(cidStr))
rawCID := make([]byte, sha256.Size)
cnr.Encode(rawCID)
chainID, _ := cmd.Flags().GetString(chainIDFlag)
hexEncoded, _ := cmd.Flags().GetBool(chainIDHexFlag)
@ -48,10 +38,7 @@ func removeRule(cmd *cobra.Command, _ []string) {
req := &control.RemoveChainLocalOverrideRequest{
Body: &control.RemoveChainLocalOverrideRequest_Body{
Target: &control.ChainTarget{
Name: cidStr,
Type: control.ChainTarget_CONTAINER,
},
Target: parseTarget(cmd),
ChainId: chainIDRaw,
},
}
@ -81,7 +68,9 @@ func initControlRemoveRuleCmd() {
initControlFlags(removeRuleCmd)
ff := removeRuleCmd.Flags()
ff.String(commonflags.CIDFlag, "", commonflags.CIDFlagUsage)
ff.String(targetNameFlag, "", targetNameDesc)
ff.String(targetTypeFlag, "", targetTypeDesc)
_ = removeRuleCmd.MarkFlagRequired(targetTypeFlag)
ff.String(chainIDFlag, "", "Chain id")
ff.Bool(chainIDHexFlag, false, "Flag to parse chain ID as hex")
}

View file

@ -18,6 +18,7 @@ func initControlShardsCmd() {
shardsCmd.AddCommand(flushCacheCmd)
shardsCmd.AddCommand(doctorCmd)
shardsCmd.AddCommand(writecacheShardCmd)
shardsCmd.AddCommand(shardsDetachCmd)
initControlShardsListCmd()
initControlSetShardModeCmd()
@ -26,4 +27,5 @@ func initControlShardsCmd() {
initControlFlushCacheCmd()
initControlDoctorCmd()
initControlShardsWritecacheCmd()
initControlShardsDetachCmd()
}

View file

@ -145,9 +145,17 @@ func getShardIDList(cmd *cobra.Command) [][]byte {
return nil
}
return getShardIDListFromIDFlag(cmd, true)
}
func getShardIDListFromIDFlag(cmd *cobra.Command, withAllFlag bool) [][]byte {
sidList, _ := cmd.Flags().GetStringSlice(shardIDFlag)
if len(sidList) == 0 {
commonCmd.ExitOnErr(cmd, "", fmt.Errorf("either --%s or --%s flag must be provided", shardIDFlag, shardAllFlag))
if withAllFlag {
commonCmd.ExitOnErr(cmd, "", fmt.Errorf("either --%s or --%s flag must be provided", shardIDFlag, shardAllFlag))
} else {
commonCmd.ExitOnErr(cmd, "", fmt.Errorf("--%s flag value must be provided", shardIDFlag))
}
}
// We can sort the ID list and perform this check without additional allocations,

View file

@ -632,14 +632,8 @@ type cfgAccessPolicyEngine struct {
type cfgObjectRoutines struct {
putRemote *ants.Pool
putRemoteCapacity int
putLocal *ants.Pool
putLocalCapacity int
replicatorPoolSize int
replication *ants.Pool
}
@ -1094,20 +1088,20 @@ func initObjectPool(cfg *config.Config) (pool cfgObjectRoutines) {
optNonBlocking := ants.WithNonblocking(true)
pool.putRemoteCapacity = objectconfig.Put(cfg).PoolSizeRemote()
pool.putRemote, err = ants.NewPool(pool.putRemoteCapacity, optNonBlocking)
putRemoteCapacity := objectconfig.Put(cfg).PoolSizeRemote()
pool.putRemote, err = ants.NewPool(putRemoteCapacity, optNonBlocking)
fatalOnErr(err)
pool.putLocalCapacity = objectconfig.Put(cfg).PoolSizeLocal()
pool.putLocal, err = ants.NewPool(pool.putLocalCapacity, optNonBlocking)
putLocalCapacity := objectconfig.Put(cfg).PoolSizeLocal()
pool.putLocal, err = ants.NewPool(putLocalCapacity, optNonBlocking)
fatalOnErr(err)
pool.replicatorPoolSize = replicatorconfig.PoolSize(cfg)
if pool.replicatorPoolSize <= 0 {
pool.replicatorPoolSize = pool.putRemoteCapacity
replicatorPoolSize := replicatorconfig.PoolSize(cfg)
if replicatorPoolSize <= 0 {
replicatorPoolSize = putRemoteCapacity
}
pool.replication, err = ants.NewPool(pool.replicatorPoolSize)
pool.replication, err = ants.NewPool(replicatorPoolSize)
fatalOnErr(err)
return pool
@ -1241,6 +1235,7 @@ func (c *cfg) reloadConfig(ctx context.Context) {
setRuntimeParameters(c)
return nil
}})
components = append(components, dCmp{"pools", c.reloadPools})
components = append(components, dCmp{"tracing", func() error {
updated, err := tracing.Setup(ctx, *tracingconfig.ToTracingConfig(c.appCfg))
if updated {
@ -1285,6 +1280,28 @@ func (c *cfg) reloadConfig(ctx context.Context) {
c.log.Info(logs.FrostFSNodeConfigurationHasBeenReloadedSuccessfully)
}
func (c *cfg) reloadPools() error {
newSize := objectconfig.Put(c.appCfg).PoolSizeLocal()
c.reloadPool(c.cfgObject.pool.putLocal, newSize, "object.put.local_pool_size")
newSize = objectconfig.Put(c.appCfg).PoolSizeRemote()
c.reloadPool(c.cfgObject.pool.putRemote, newSize, "object.put.remote_pool_size")
newSize = replicatorconfig.PoolSize(c.appCfg)
c.reloadPool(c.cfgObject.pool.replication, newSize, "replicator.pool_size")
return nil
}
func (c *cfg) reloadPool(p *ants.Pool, newSize int, name string) {
oldSize := p.Cap()
if oldSize != newSize {
c.log.Info(logs.FrostFSNodePoolConfigurationUpdate, zap.String("field", name),
zap.Int("old", oldSize), zap.Int("new", newSize))
p.Tune(newSize)
}
}
func (c *cfg) reloadAppConfig() error {
unlock := c.LockAppConfigExclusive()
defer unlock()

View file

@ -210,7 +210,7 @@ type morphContainerReader struct {
src containerCore.Source
lister interface {
List(*user.ID) ([]cid.ID, error)
ContainersOf(*user.ID) ([]cid.ID, error)
}
}
@ -226,8 +226,8 @@ func (x *morphContainerReader) GetEACL(id cid.ID) (*containerCore.EACL, error) {
return x.eacl.GetEACL(id)
}
func (x *morphContainerReader) List(id *user.ID) ([]cid.ID, error) {
return x.lister.List(id)
func (x *morphContainerReader) ContainersOf(id *user.ID) ([]cid.ID, error) {
return x.lister.ContainersOf(id)
}
type morphContainerWriter struct {

View file

@ -263,7 +263,6 @@ func addPolicer(c *cfg, keyStorage *util.KeyStorage, clientConstructor *cache.Cl
)
}
}),
policer.WithMaxCapacity(c.cfgObject.pool.replicatorPoolSize),
policer.WithPool(c.cfgObject.pool.replication),
policer.WithMetrics(c.metricsCollector.PolicerMetrics()),
)

View file

@ -47,6 +47,7 @@
"FROSTFS_NODE_ADDRESSES":"127.0.0.1:8080",
"FROSTFS_GRPC_0_ENDPOINT":"127.0.0.1:8080",
"FROSTFS_CONTROL_GRPC_ENDPOINT":"127.0.0.1:8081",
"FROSTFS_CONTROL_AUTHORIZED_KEYS":"031a6c6fbbdf02ca351745fa86b9ba5a9452d785ac4f7fc2b7548ca2a46c4fcf4a",
"FROSTFS_NODE_ATTRIBUTE_0":"User-Agent:FrostFS/dev",
"FROSTFS_NODE_ATTRIBUTE_1":"UN-LOCODE:RU MOW",
"FROSTFS_NODE_PERSISTENT_STATE_PATH":"${workspaceFolder}/.cache/.frostfs-node-state",

View file

@ -46,3 +46,9 @@ Shard can automatically switch to a `degraded-read-only` mode in 3 cases:
1. If the metabase was not available or couldn't be opened/initialized during shard startup.
2. If shard error counter exceeds threshold.
3. If the metabase couldn't be reopened during SIGHUP handling.
# Detach shard
To detach a shard use `frostfs-cli control shards detach` command. This command removes the shards from the storage
engine and closes all resources associated with the shards.
Limitation: `SIGHUP` or storage node restart lead to detached shard will be again online.

7
go.mod
View file

@ -3,6 +3,7 @@ module git.frostfs.info/TrueCloudLab/frostfs-node
go 1.20
require (
code.gitea.io/sdk/gitea v0.17.1
git.frostfs.info/TrueCloudLab/frostfs-api-go/v2 v2.16.1-0.20240112150928-72885aae835c
git.frostfs.info/TrueCloudLab/frostfs-contract v0.18.1-0.20240115082915-f2a82aa635aa
git.frostfs.info/TrueCloudLab/frostfs-observability v0.0.0-20231101111734-b3ad3335ff65
@ -29,7 +30,7 @@ require (
github.com/spf13/cobra v1.8.0
github.com/spf13/pflag v1.0.5
github.com/spf13/viper v1.18.2
github.com/stretchr/testify v1.8.4
github.com/stretchr/testify v1.9.0
go.etcd.io/bbolt v1.3.8
go.opentelemetry.io/otel v1.22.0
go.opentelemetry.io/otel/trace v1.22.0
@ -61,8 +62,10 @@ require (
github.com/consensys/gnark-crypto v0.12.2-0.20231222162921-eb75782795d2 // indirect
github.com/cpuguy83/go-md2man/v2 v2.0.3 // indirect
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/davidmz/go-pageant v1.0.2 // indirect
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.2.0 // indirect
github.com/fsnotify/fsnotify v1.7.0 // indirect
github.com/go-fed/httpsig v1.1.0 // indirect
github.com/go-logr/logr v1.4.1 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/golang/protobuf v1.5.3 // indirect
@ -71,6 +74,7 @@ require (
github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus v1.0.0 // indirect
github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.0.1 // indirect
github.com/grpc-ecosystem/grpc-gateway/v2 v2.19.0 // indirect
github.com/hashicorp/go-version v1.6.0 // indirect
github.com/hashicorp/golang-lru v1.0.2 // indirect
github.com/hashicorp/hcl v1.0.0 // indirect
github.com/holiman/uint256 v1.2.4 // indirect
@ -103,6 +107,7 @@ require (
github.com/russross/blackfriday/v2 v2.1.0 // indirect
github.com/spaolacci/murmur3 v1.1.0 // indirect
github.com/spf13/afero v1.11.0 // indirect
github.com/stretchr/objx v0.5.2 // indirect
github.com/subosito/gotenv v1.6.0 // indirect
github.com/syndtr/goleveldb v1.0.1-0.20220721030215-126854af5e6d // indirect
github.com/twmb/murmur3 v1.1.8 // indirect

BIN
go.sum

Binary file not shown.

View file

@ -451,6 +451,7 @@ const (
FrostFSNodeLoggerConfigurationPreparation = "logger configuration preparation"
FrostFSNodeTracingConfigationUpdated = "tracing configation updated"
FrostFSNodeStorageEngineConfigurationUpdate = "storage engine configuration update"
FrostFSNodePoolConfigurationUpdate = "adjust pool configuration"
FrostFSNodeUpdatedConfigurationApplying = "updated configuration applying"
FrostFSNodeConfigurationHasBeenReloadedSuccessfully = "configuration has been reloaded successfully"
FrostFSNodeReadNewlyCreatedContainerAfterTheNotification = "read newly created container after the notification"

View file

@ -2,7 +2,9 @@ package engine
import (
"context"
"errors"
"fmt"
"sync"
"sync/atomic"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
@ -14,6 +16,7 @@ import (
"github.com/google/uuid"
"github.com/panjf2000/ants/v2"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
)
var errShardNotFound = logicerr.New("shard not found")
@ -344,6 +347,100 @@ func (e *StorageEngine) HandleNewEpoch(ctx context.Context, epoch uint64) {
}
}
func (e *StorageEngine) DetachShards(ids []*shard.ID) error {
if len(ids) == 0 {
return logicerr.New("ids must be non-empty")
}
deletedShards, err := e.deleteShards(ids)
if err != nil {
return err
}
return e.closeShards(deletedShards)
}
// closeShards closes deleted shards. Tries to close all shards.
// Returns single error with joined shard errors.
func (e *StorageEngine) closeShards(deletedShards []hashedShard) error {
var multiErr error
var multiErrGuard sync.Mutex
var eg errgroup.Group
for _, sh := range deletedShards {
sh := sh
eg.Go(func() error {
err := sh.SetMode(mode.Disabled)
if err != nil {
e.log.Error(logs.EngineCouldNotChangeShardModeToDisabled,
zap.Stringer("id", sh.ID()),
zap.Error(err),
)
multiErrGuard.Lock()
multiErr = errors.Join(multiErr, fmt.Errorf("could not change shard (id:%s) mode to disabled: %w", sh.ID(), err))
multiErrGuard.Unlock()
}
err = sh.Close()
if err != nil {
e.log.Error(logs.EngineCouldNotCloseRemovedShard,
zap.Stringer("id", sh.ID()),
zap.Error(err),
)
multiErrGuard.Lock()
multiErr = errors.Join(multiErr, fmt.Errorf("could not close removed shard (id:%s): %w", sh.ID(), err))
multiErrGuard.Unlock()
}
return nil
})
}
if err := eg.Wait(); err != nil {
return err
}
return multiErr
}
// deleteShards deletes shards with specified ids from engine shard list
// and releases all engine resources associated with shards.
// Returns deleted shards or error if some shard could not be deleted.
func (e *StorageEngine) deleteShards(ids []*shard.ID) ([]hashedShard, error) {
ss := make([]hashedShard, 0, len(ids))
e.mtx.Lock()
defer e.mtx.Unlock()
for _, id := range ids {
idStr := id.String()
sh, found := e.shards[idStr]
if !found {
return nil, errShardNotFound
}
ss = append(ss, sh)
}
if len(ss) == len(e.shards) {
return nil, logicerr.New("could not delete all the shards")
}
for _, sh := range ss {
idStr := sh.ID().String()
sh.DeleteShardMetrics()
delete(e.shards, idStr)
pool, ok := e.shardPools[idStr]
if ok {
pool.Release()
delete(e.shardPools, idStr)
}
e.log.Info(logs.EngineShardHasBeenRemoved,
zap.String("id", idStr))
}
return ss, nil
}
func (s hashedShard) Hash() uint64 {
return s.hash
}

View file

@ -4,6 +4,8 @@ import (
"context"
"testing"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
"github.com/stretchr/testify/require"
)
@ -42,3 +44,21 @@ func TestRemoveShard(t *testing.T) {
require.True(t, ok != removed)
}
}
func TestDisableShards(t *testing.T) {
t.Parallel()
const numOfShards = 2
te := testNewEngine(t).setShardsNum(t, numOfShards)
e, ids := te.engine, te.shardIDs
defer func() { require.NoError(t, e.Close(context.Background())) }()
require.ErrorAs(t, e.DetachShards(ids), new(logicerr.Logical))
require.ErrorAs(t, e.DetachShards(nil), new(logicerr.Logical))
require.ErrorAs(t, e.DetachShards([]*shard.ID{}), new(logicerr.Logical))
require.NoError(t, e.DetachShards([]*shard.ID{ids[0]}))
require.Equal(t, 1, len(e.shards))
}

View file

@ -254,9 +254,18 @@ func (db *DB) delete(tx *bbolt.Tx, addr oid.Address, refCounter referenceCounter
// unmarshal object, work only with physically stored (raw == true) objects
obj, err := db.get(tx, addr, key, false, true, currEpoch)
if err != nil {
if client.IsErrObjectNotFound(err) {
addrKey = addressKey(addr, key)
if garbageBKT != nil {
err := garbageBKT.Delete(addrKey)
if err != nil {
return deleteSingleResult{}, fmt.Errorf("could not remove from garbage bucket: %w", err)
}
}
return deleteSingleResult{}, nil
}
var siErr *objectSDK.SplitInfoError
if client.IsErrObjectNotFound(err) || errors.As(err, &siErr) {
if errors.As(err, &siErr) {
// if object is virtual (parent) then do nothing, it will be deleted with last child
return deleteSingleResult{}, nil
}

View file

@ -182,6 +182,37 @@ func TestDelete(t *testing.T) {
require.Equal(t, 0, len(addrs))
}
func TestDeleteDropsGCMarkIfObjectNotFound(t *testing.T) {
db := newDB(t, meta.WithEpochState(epochState{currEpoch}))
defer func() { require.NoError(t, db.Close()) }()
addr := oidtest.Address()
var prm meta.InhumePrm
prm.SetAddresses(addr)
prm.SetGCMark()
_, err := db.Inhume(context.Background(), prm)
require.NoError(t, err)
var garbageCount int
var itPrm meta.GarbageIterationPrm
itPrm.SetHandler(func(g meta.GarbageObject) error {
garbageCount++
return nil
})
require.NoError(t, db.IterateOverGarbage(context.Background(), itPrm))
require.Equal(t, 1, garbageCount)
var delPrm meta.DeletePrm
delPrm.SetAddresses(addr)
_, err = db.Delete(context.Background(), delPrm)
require.NoError(t, err)
garbageCount = 0
require.NoError(t, db.IterateOverGarbage(context.Background(), itPrm))
require.Equal(t, 0, garbageCount)
}
func metaDelete(db *meta.DB, addrs ...oid.Address) error {
var deletePrm meta.DeletePrm
deletePrm.SetAddresses(addrs...)

View file

@ -53,7 +53,7 @@ func (c *Client) ContainersOf(idUser *user.ID) ([]cid.ID, error) {
err := c.client.Morph().TestInvokeIterator(cb, batchSize, cnrHash, containersOfMethod, rawID)
if err != nil {
if errors.Is(err, unwrap.ErrNoSessionID) {
return c.List(idUser)
return c.list(idUser)
}
return nil, err
}

View file

@ -8,13 +8,13 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
)
// List returns a list of container identifiers belonging
// list returns a list of container identifiers belonging
// to the specified user of FrostFS system. The list is composed
// through Container contract call.
//
// Returns the identifiers of all FrostFS containers if pointer
// to user identifier is nil.
func (c *Client) List(idUser *user.ID) ([]cid.ID, error) {
func (c *Client) list(idUser *user.ID) ([]cid.ID, error) {
var rawID []byte
if idUser != nil {

View file

@ -26,10 +26,10 @@ type Reader interface {
containercore.Source
containercore.EACLSource
// List returns a list of container identifiers belonging
// ContainersOf returns a list of container identifiers belonging
// to the specified user of FrostFS system. Returns the identifiers
// of all FrostFS containers if pointer to owner identifier is nil.
List(*user.ID) ([]cid.ID, error)
ContainersOf(*user.ID) ([]cid.ID, error)
}
// Writer is an interface of container storage updater.
@ -187,7 +187,7 @@ func (s *morphExecutor) List(_ context.Context, body *container.ListRequestBody)
return nil, fmt.Errorf("invalid user ID: %w", err)
}
cnrs, err := s.rdr.List(&id)
cnrs, err := s.rdr.ContainersOf(&id)
if err != nil {
return nil, err
}

View file

@ -26,6 +26,7 @@ const (
rpcRemoveChainLocalOverride = "RemoveChainLocalOverride"
rpcSealWriteCache = "SealWriteCache"
rpcListTargetsLocalOverrides = "ListTargetsLocalOverrides"
rpcDetachShards = "DetachShards"
)
// HealthCheck executes ControlService.HealthCheck RPC.
@ -292,3 +293,22 @@ func SealWriteCache(cli *client.Client, req *SealWriteCacheRequest, opts ...clie
return wResp.message, nil
}
// DetachShards executes ControlService.DetachShards RPC.
func DetachShards(
cli *client.Client,
req *DetachShardsRequest,
opts ...client.CallOption,
) (*DetachShardsResponse, error) {
wResp := newResponseWrapper[DetachShardsResponse]()
wReq := &requestWrapper{
m: req,
}
err := client.SendUnary(cli, common.CallMethodInfoUnary(serviceName, rpcDetachShards), wReq, wResp, opts...)
if err != nil {
return nil, err
}
return wResp.message, nil
}

View file

@ -0,0 +1,37 @@
package control
import (
"context"
"errors"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/control"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
func (s *Server) DetachShards(_ context.Context, req *control.DetachShardsRequest) (*control.DetachShardsResponse, error) {
err := s.isValidRequest(req)
if err != nil {
return nil, status.Error(codes.PermissionDenied, err.Error())
}
shardIDs := s.getShardIDList(req.GetBody().GetShard_ID())
if err := s.s.DetachShards(shardIDs); err != nil {
if errors.As(err, new(logicerr.Logical)) {
return nil, status.Error(codes.InvalidArgument, err.Error())
}
return nil, status.Error(codes.Internal, err.Error())
}
resp := &control.DetachShardsResponse{
Body: &control.DetachShardsResponse_Body{},
}
if err = SignMessage(s.key, resp); err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
return resp, nil
}

Binary file not shown.

View file

@ -62,6 +62,9 @@ service ControlService {
// Flush objects from write-cache and move it to degraded read only mode.
rpc SealWriteCache(SealWriteCacheRequest) returns (SealWriteCacheResponse);
// DetachShards detaches and closes shards.
rpc DetachShards(DetachShardsRequest) returns (DetachShardsResponse);
}
// Health check request.
@ -584,3 +587,21 @@ message SealWriteCacheResponse {
Signature signature = 2;
}
message DetachShardsRequest {
message Body {
repeated bytes shard_ID = 1;
}
Body body = 1;
Signature signature = 2;
}
message DetachShardsResponse {
message Body {
}
Body body = 1;
Signature signature = 2;
}

Binary file not shown.

Binary file not shown.

View file

@ -64,8 +64,6 @@ type cfg struct {
taskPool *ants.Pool
maxCapacity int
batchSize, cacheSize uint32
rebalanceFreq, evictDuration, sleepDuration time.Duration
@ -158,14 +156,6 @@ func WithRedundantCopyCallback(cb RedundantCopyCallback) Option {
}
}
// WithMaxCapacity returns option to set max capacity
// that can be set to the pool.
func WithMaxCapacity(capacity int) Option {
return func(c *cfg) {
c.maxCapacity = capacity
}
}
// WithPool returns option to set pool for
// policy and replication operations.
func WithPool(p *ants.Pool) Option {

View file

@ -66,7 +66,7 @@ func New(opts ...Option) *Policer {
cfg: c,
cache: cache,
objsInWork: &objectsInWork{
objs: make(map[oid.Address]struct{}, c.maxCapacity),
objs: make(map[oid.Address]struct{}, c.taskPool.Cap()),
},
}
}

View file

@ -48,16 +48,12 @@ func TestBuryObjectWithoutContainer(t *testing.T) {
return nil
}
// Task pool
pool, err := ants.NewPool(4)
require.NoError(t, err)
// Policer instance
p := New(
WithKeySpaceIterator(&sliceKeySpaceIterator{objs: objs}),
WithContainerSource(containerSrc),
WithBuryFunc(buryFn),
WithPool(pool),
WithPool(testPool(t)),
)
ctx, cancel := context.WithCancel(context.Background())
@ -239,6 +235,7 @@ func TestProcessObject(t *testing.T) {
gotReplicateTo = append(gotReplicateTo, int(node.PublicKey()[0]))
}
})),
WithPool(testPool(t)),
)
addrWithType := objectcore.AddressWithType{
@ -276,6 +273,7 @@ func TestProcessObjectError(t *testing.T) {
p := New(
WithContainerSource(source),
WithBuryFunc(buryFn),
WithPool(testPool(t)),
)
addrWithType := objectcore.AddressWithType{
@ -296,9 +294,6 @@ func TestIteratorContract(t *testing.T) {
return nil
}
pool, err := ants.NewPool(4)
require.NoError(t, err)
it := &predefinedIterator{
scenario: []nextResult{
{objs, nil},
@ -324,7 +319,7 @@ func TestIteratorContract(t *testing.T) {
WithKeySpaceIterator(it),
WithContainerSource(containerSrc),
WithBuryFunc(buryFn),
WithPool(pool),
WithPool(testPool(t)),
func(c *cfg) {
c.sleepDuration = time.Millisecond
},
@ -348,6 +343,12 @@ func TestIteratorContract(t *testing.T) {
}, it.calls)
}
func testPool(t *testing.T) *ants.Pool {
pool, err := ants.NewPool(4)
require.NoError(t, err)
return pool
}
type nextResult struct {
objs []objectcore.AddressWithType
err error

View file

@ -3,10 +3,12 @@ package policer
import (
"context"
"errors"
"sync"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
"go.uber.org/zap"
)
@ -34,6 +36,7 @@ func (p *Policer) shardPolicyWorker(ctx context.Context) {
p.log.Warn(logs.PolicerFailureAtObjectSelectForReplication, zap.Error(err))
}
skipMap := newSkipMap()
for i := range addrs {
select {
case <-ctx.Done():
@ -55,7 +58,7 @@ func (p *Policer) shardPolicyWorker(ctx context.Context) {
if p.objsInWork.add(addr.Address) {
err := p.processObject(ctx, addr)
if err != nil {
if err != nil && !skipMap.addSeenError(addr.Address.Container(), err) {
p.log.Error(logs.PolicerUnableToProcessObj,
zap.Stringer("object", addr.Address),
zap.String("error", err.Error()))
@ -72,3 +75,36 @@ func (p *Policer) shardPolicyWorker(ctx context.Context) {
}
}
}
type errMap struct {
sync.Mutex
skipMap map[cid.ID][]error
}
func newSkipMap() *errMap {
return &errMap{
skipMap: make(map[cid.ID][]error),
}
}
// addSeenError marks err as seen error for the container.
// Returns true is the error has already been added.
func (m *errMap) addSeenError(cnr cid.ID, err error) bool {
m.Lock()
defer m.Unlock()
for _, e := range m.skipMap[cnr] {
if errors.Is(err, e) {
return true
}
}
// Restrict list length to avoid possible OOM if some random error is added in future.
const maxErrListLength = 10
lst := m.skipMap[cnr]
if len(lst) < maxErrListLength {
m.skipMap[cnr] = append(lst, err)
}
return false
}