Compare commits

..

No commits in common. "master" and "fix/wc_drop_compressed" have entirely different histories.

171 changed files with 1538 additions and 2477 deletions

View file

@ -106,6 +106,4 @@ jobs:
run: make fumpt-install
- name: Run gofumpt
run: |
make fumpt
git diff --exit-code --quiet
run: make fumpt

View file

@ -38,10 +38,6 @@ linters-settings:
alias:
pkg: git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object
alias: objectSDK
unused:
field-writes-are-uses: false
exported-fields-are-used: false
local-variables-are-used: false
custom:
truecloudlab-linters:
path: bin/linters/external_linters.so

View file

@ -4,7 +4,7 @@ SHELL = bash
REPO ?= $(shell go list -m)
VERSION ?= $(shell git describe --tags --dirty --match "v*" --always --abbrev=8 2>/dev/null || cat VERSION 2>/dev/null || echo "develop")
HUB_IMAGE ?= git.frostfs.info/truecloudlab/frostfs
HUB_IMAGE ?= truecloudlab/frostfs
HUB_TAG ?= "$(shell echo ${VERSION} | sed 's/^v//')"
GO_VERSION ?= 1.22

View file

@ -7,8 +7,9 @@
</p>
---
[![Report](https://goreportcard.com/badge/git.frostfs.info/TrueCloudLab/frostfs-node)](https://goreportcard.com/report/git.frostfs.info/TrueCloudLab/frostfs-node)
![Release (latest)](https://git.frostfs.info/TrueCloudLab/frostfs-node/badges/release.svg)
[![Report](https://goreportcard.com/badge/github.com/TrueCloudLab/frostfs-node)](https://goreportcard.com/report/github.com/TrueCloudLab/frostfs-node)
![GitHub release (latest SemVer)](https://img.shields.io/github/v/release/TrueCloudLab/frostfs-node?sort=semver)
![License](https://img.shields.io/github/license/TrueCloudLab/frostfs-node.svg?style=popout)
# Overview
@ -32,8 +33,8 @@ manipulate large amounts of data without paying a prohibitive price.
FrostFS has a native [gRPC API](https://git.frostfs.info/TrueCloudLab/frostfs-api) and has
protocol gateways for popular protocols such as [AWS
S3](https://git.frostfs.info/TrueCloudLab/frostfs-s3-gw),
[HTTP](https://git.frostfs.info/TrueCloudLab/frostfs-http-gw),
S3](https://github.com/TrueCloudLab/frostfs-s3-gw),
[HTTP](https://github.com/TrueCloudLab/frostfs-http-gw),
[FUSE](https://wikipedia.org/wiki/Filesystem_in_Userspace) and
[sFTP](https://en.wikipedia.org/wiki/SSH_File_Transfer_Protocol) allowing
developers to integrate applications without rewriting their code.
@ -44,7 +45,7 @@ Now, we only support GNU/Linux on amd64 CPUs with AVX/AVX2 instructions. More
platforms will be officially supported after release `1.0`.
The latest version of frostfs-node works with frostfs-contract
[v0.19.2](https://git.frostfs.info/TrueCloudLab/frostfs-contract/releases/tag/v0.19.2).
[v0.16.0](https://github.com/TrueCloudLab/frostfs-contract/releases/tag/v0.16.0).
# Building
@ -70,7 +71,7 @@ make docker/bin/frostfs-<name> # build a specific binary
## Docker images
To make docker images suitable for use in [frostfs-dev-env](https://git.frostfs.info/TrueCloudLab/frostfs-dev-env/) use:
To make docker images suitable for use in [frostfs-dev-env](https://github.com/TrueCloudLab/frostfs-dev-env/) use:
```
make images
```
@ -124,7 +125,7 @@ the feature/topic you are going to implement.
# Credits
FrostFS is maintained by [True Cloud Lab](https://git.frostfs.info/TrueCloudLab/) with the help and
FrostFS is maintained by [True Cloud Lab](https://github.com/TrueCloudLab/) with the help and
contributions from community members.
Please see [CREDITS](CREDITS.md) for details.

View file

@ -9,8 +9,8 @@ related configuration details.
To follow this guide you need:
- latest released version of [neo-go](https://github.com/nspcc-dev/neo-go/releases) (v0.97.2 at the moment),
- latest released version of [frostfs-adm](https://git.frostfs.info/TrueCloudLab/frostfs-node/releases) utility (v0.42.9 at the moment),
- latest released version of compiled [frostfs-contract](https://git.frostfs.info/TrueCloudLab/frostfs-contract/releases) (v0.19.2 at the moment).
- latest released version of [frostfs-adm](https://github.com/TrueCloudLab/frostfs-node/releases) utility (v0.25.1 at the moment),
- latest released version of compiled [frostfs-contract](https://github.com/TrueCloudLab/frostfs-contract/releases) (v0.11.0 at the moment).
## Step 1: Prepare network configuration

View file

@ -39,5 +39,4 @@ const (
CustomZoneFlag = "domain"
AlphabetSizeFlag = "size"
AllFlag = "all"
DeltaFlag = "delta"
)

View file

@ -72,17 +72,13 @@ func InvalidConfigValueErr(key string) error {
return fmt.Errorf("invalid %s config value from netmap contract", key)
}
func EmitNewEpochCall(bw *io.BufBinWriter, wCtx *InitializeContext, nmHash util.Uint160, countEpoch int64) error {
if countEpoch <= 0 {
return errors.New("number of epochs cannot be less than 1")
}
func EmitNewEpochCall(bw *io.BufBinWriter, wCtx *InitializeContext, nmHash util.Uint160) error {
curr, err := unwrap.Int64(wCtx.ReadOnlyInvoker.Call(nmHash, "epoch"))
if err != nil {
return errors.New("can't fetch current epoch from the netmap contract")
}
newEpoch := curr + countEpoch
newEpoch := curr + 1
wCtx.Command.Printf("Current epoch: %d, increase to %d.\n", curr, newEpoch)
// In NeoFS this is done via Notary contract. Here, however, we can form the

View file

@ -1,8 +1,6 @@
package initialize
import (
"fmt"
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-adm/internal/modules/morph/helper"
"github.com/nspcc-dev/neo-go/pkg/core/native/noderoles"
"github.com/nspcc-dev/neo-go/pkg/io"
@ -31,14 +29,10 @@ func setNotaryAndAlphabetNodes(c *helper.InitializeContext) error {
callflag.States|callflag.AllowNotify, int64(noderoles.NeoFSAlphabet), pubs)
if err := c.SendCommitteeTx(w.Bytes(), false); err != nil {
return fmt.Errorf("send committee transaction: %w", err)
return err
}
err := c.AwaitTx()
if err != nil {
err = fmt.Errorf("await committee transaction: %w", err)
}
return err
return c.AwaitTx()
}
func setRolesFinished(c *helper.InitializeContext) (bool, error) {

View file

@ -3,7 +3,6 @@ package initialize
import (
"fmt"
"math/big"
"strings"
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-adm/internal/modules/morph/constants"
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-adm/internal/modules/morph/helper"
@ -145,17 +144,5 @@ func createNEP17MultiTransferTx(c helper.Client, acc *wallet.Account, recipients
if err != nil {
return nil, fmt.Errorf("can't create actor: %w", err)
}
tx, err := act.MakeRun(w.Bytes())
if err != nil {
sum := make(map[util.Uint160]int64)
for _, recipient := range recipients {
sum[recipient.Token] += recipient.Amount
}
detail := make([]string, 0, len(sum))
for _, value := range sum {
detail = append(detail, fmt.Sprintf("amount=%v", value))
}
err = fmt.Errorf("transfer failed: from=%s(%s) %s: %w", acc.Label, acc.Address, strings.Join(detail, " "), err)
}
return tx, err
return act.MakeRun(w.Bytes())
}

View file

@ -4,7 +4,6 @@ import (
"fmt"
"strings"
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-adm/internal/commonflags"
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-adm/internal/modules/morph/constants"
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-adm/internal/modules/morph/helper"
"github.com/nspcc-dev/neo-go/pkg/io"
@ -31,7 +30,7 @@ func ForceNewEpochCmd(cmd *cobra.Command, _ []string) error {
}
bw := io.NewBufBinWriter()
if err := helper.EmitNewEpochCall(bw, wCtx, nmHash, viper.GetInt64(commonflags.DeltaFlag)); err != nil {
if err := helper.EmitNewEpochCall(bw, wCtx, nmHash); err != nil {
return err
}

View file

@ -22,7 +22,6 @@ var (
PreRun: func(cmd *cobra.Command, _ []string) {
_ = viper.BindPFlag(commonflags.AlphabetWalletsFlag, cmd.Flags().Lookup(commonflags.AlphabetWalletsFlag))
_ = viper.BindPFlag(commonflags.EndpointFlag, cmd.Flags().Lookup(commonflags.EndpointFlag))
_ = viper.BindPFlag(commonflags.DeltaFlag, cmd.Flags().Lookup(commonflags.DeltaFlag))
},
RunE: ForceNewEpochCmd,
}
@ -36,7 +35,6 @@ func initForceNewEpochCmd() {
ForceNewEpoch.Flags().String(commonflags.AlphabetWalletsFlag, "", commonflags.AlphabetWalletsFlagDesc)
ForceNewEpoch.Flags().StringP(commonflags.EndpointFlag, commonflags.EndpointFlagShort, "", commonflags.EndpointFlagDesc)
ForceNewEpoch.Flags().String(commonflags.LocalDumpFlag, "", "Path to the blocks dump file")
ForceNewEpoch.Flags().Int64(commonflags.DeltaFlag, 1, "Number of epochs to increase the current epoch")
}
func init() {

View file

@ -42,23 +42,3 @@ func registerDomain(cmd *cobra.Command, _ []string) {
commonCmd.ExitOnErr(cmd, "register domain error: %w", err)
cmd.Println("Domain registered successfully")
}
func initDeleteCmd() {
Cmd.AddCommand(deleteCmd)
deleteCmd.Flags().StringP(commonflags.EndpointFlag, commonflags.EndpointFlagShort, "", commonflags.EndpointFlagDesc)
deleteCmd.Flags().String(commonflags.AlphabetWalletsFlag, "", commonflags.AlphabetWalletsFlagDesc)
deleteCmd.Flags().String(nnsNameFlag, "", nnsNameFlagDesc)
_ = cobra.MarkFlagRequired(deleteCmd.Flags(), nnsNameFlag)
}
func deleteDomain(cmd *cobra.Command, _ []string) {
c, actor, _ := getRPCClient(cmd)
name, _ := cmd.Flags().GetString(nnsNameFlag)
h, vub, err := c.DeleteDomain(name)
_, err = actor.Wait(h, vub, err)
commonCmd.ExitOnErr(cmd, "delete domain error: %w", err)
cmd.Println("Domain deleted successfully")
}

View file

@ -42,15 +42,6 @@ var (
},
Run: registerDomain,
}
deleteCmd = &cobra.Command{
Use: "delete",
Short: "Delete a domain by name",
PreRun: func(cmd *cobra.Command, _ []string) {
_ = viper.BindPFlag(commonflags.EndpointFlag, cmd.Flags().Lookup(commonflags.EndpointFlag))
_ = viper.BindPFlag(commonflags.AlphabetWalletsFlag, cmd.Flags().Lookup(commonflags.AlphabetWalletsFlag))
},
Run: deleteDomain,
}
renewCmd = &cobra.Command{
Use: "renew",
Short: "Increases domain expiration date",
@ -100,7 +91,6 @@ var (
func init() {
initTokensCmd()
initRegisterCmd()
initDeleteCmd()
initRenewCmd()
initUpdateCmd()
initAddRecordCmd()

View file

@ -1,25 +1,15 @@
package nns
import (
"math/big"
"strings"
"git.frostfs.info/TrueCloudLab/frostfs-contract/nns"
client "git.frostfs.info/TrueCloudLab/frostfs-contract/rpcclient/nns"
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-adm/internal/commonflags"
commonCmd "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/internal/common"
"github.com/spf13/cobra"
)
const (
verboseDesc = "Include additional information about CNAME record."
)
func initTokensCmd() {
Cmd.AddCommand(tokensCmd)
tokensCmd.Flags().StringP(commonflags.EndpointFlag, commonflags.EndpointFlagShort, "", commonflags.EndpointFlagDesc)
tokensCmd.Flags().String(commonflags.AlphabetWalletsFlag, "", commonflags.AlphabetWalletsFlagDesc)
tokensCmd.Flags().BoolP(commonflags.Verbose, commonflags.VerboseShorthand, false, verboseDesc)
}
func listTokens(cmd *cobra.Command, _ []string) {
@ -28,39 +18,7 @@ func listTokens(cmd *cobra.Command, _ []string) {
commonCmd.ExitOnErr(cmd, "unable to get tokens: %w", err)
for toks, err := it.Next(10); err == nil && len(toks) > 0; toks, err = it.Next(10) {
for _, token := range toks {
output := string(token)
if verbose, _ := cmd.Flags().GetBool(commonflags.Verbose); verbose {
cname, err := getCnameRecord(c, token)
commonCmd.ExitOnErr(cmd, "", err)
if cname != "" {
output += " (CNAME: " + cname + ")"
}
}
cmd.Println(output)
cmd.Println(string(token))
}
}
}
func getCnameRecord(c *client.Contract, token []byte) (string, error) {
items, err := c.GetRecords(string(token), big.NewInt(int64(nns.CNAME)))
// GetRecords returns the error "not an array" if the domain does not contain records.
if err != nil && strings.Contains(err.Error(), "not an array") {
return "", nil
}
if err != nil {
return "", err
}
if len(items) == 0 {
return "", nil
}
record, err := items[0].TryBytes()
if err != nil {
return "", err
}
return string(record), nil
}

View file

@ -53,7 +53,7 @@ func RemoveNodesCmd(cmd *cobra.Command, args []string) error {
int64(netmapcontract.NodeStateOffline), nodeKeys[i].Bytes())
}
if err := helper.EmitNewEpochCall(bw, wCtx, nmHash, 1); err != nil {
if err := helper.EmitNewEpochCall(bw, wCtx, nmHash); err != nil {
return err
}

View file

@ -30,13 +30,11 @@ var (
func initProxyAddAccount() {
AddAccountCmd.Flags().StringP(commonflags.EndpointFlag, commonflags.EndpointFlagShort, "", commonflags.EndpointFlagDesc)
AddAccountCmd.Flags().String(accountAddressFlag, "", "Wallet address string")
AddAccountCmd.Flags().String(commonflags.AlphabetWalletsFlag, "", commonflags.AlphabetWalletsFlagDesc)
}
func initProxyRemoveAccount() {
RemoveAccountCmd.Flags().StringP(commonflags.EndpointFlag, commonflags.EndpointFlagShort, "", commonflags.EndpointFlagDesc)
RemoveAccountCmd.Flags().String(accountAddressFlag, "", "Wallet address string")
RemoveAccountCmd.Flags().String(commonflags.AlphabetWalletsFlag, "", commonflags.AlphabetWalletsFlagDesc)
}
func init() {

View file

@ -565,6 +565,13 @@ type HeadObjectPrm struct {
commonObjectPrm
objectAddressPrm
rawPrm
mainOnly bool
}
// SetMainOnlyFlag sets flag to get only main fields of an object header in terms of FrostFS API.
func (x *HeadObjectPrm) SetMainOnlyFlag(v bool) {
x.mainOnly = v
}
// HeadObjectRes groups the resulting values of HeadObject operation.

View file

@ -1,6 +1,9 @@
package container
import (
"strings"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/container"
internalclient "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/client"
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/commonflags"
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/key"
@ -81,8 +84,12 @@ var listContainersCmd = &cobra.Command{
cmd.Println(cnrID.String())
if flagVarListPrintAttr {
cnr.IterateUserAttributes(func(key, val string) {
cmd.Printf(" %s: %s\n", key, val)
cnr.IterateAttributes(func(key, val string) {
if !strings.HasPrefix(key, container.SysAttributePrefix) && !strings.HasPrefix(key, container.SysAttributePrefixNeoFS) {
// FIXME(@cthulhu-rider): https://git.frostfs.info/TrueCloudLab/frostfs-sdk-go/issues/97
// Use dedicated method to skip system attributes.
cmd.Printf(" %s: %s\n", key, val)
}
})
}
}

View file

@ -1,6 +1,9 @@
package container
import (
"strings"
v2object "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object"
internalclient "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/client"
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/commonflags"
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/key"
@ -64,8 +67,14 @@ var listContainerObjectsCmd = &cobra.Command{
resHead, err := internalclient.HeadObject(cmd.Context(), prmHead)
if err == nil {
for _, attr := range resHead.Header().UserAttributes() {
cmd.Printf(" %s: %s\n", attr.Key(), attr.Value())
attrs := resHead.Header().Attributes()
for i := range attrs {
attrKey := attrs[i].Key()
if !strings.HasPrefix(attrKey, v2object.SysAttributePrefix) && !strings.HasPrefix(attrKey, v2object.SysAttributePrefixNeoFS) {
// FIXME(@cthulhu-rider): https://git.frostfs.info/TrueCloudLab/frostfs-sdk-go/issues/97
// Use dedicated method to skip system attributes.
cmd.Printf(" %s: %s\n", attrKey, attrs[i].Value())
}
}
} else {
cmd.Printf(" failed to read attributes: %v\n", err)

View file

@ -20,10 +20,6 @@ const (
awaitFlag = "await"
noProgressFlag = "no-progress"
scopeFlag = "scope"
repOneOnlyFlag = "rep-one-only"
containerWorkerCountFlag = "container-worker-count"
objectWorkerCountFlag = "object-worker-count"
scopeAll = "all"
scopeObjects = "objects"
@ -68,18 +64,12 @@ func startEvacuateShard(cmd *cobra.Command, _ []string) {
pk := key.Get(cmd)
ignoreErrors, _ := cmd.Flags().GetBool(ignoreErrorsFlag)
containerWorkerCount, _ := cmd.Flags().GetUint32(containerWorkerCountFlag)
objectWorkerCount, _ := cmd.Flags().GetUint32(objectWorkerCountFlag)
repOneOnly, _ := cmd.Flags().GetBool(repOneOnlyFlag)
req := &control.StartShardEvacuationRequest{
Body: &control.StartShardEvacuationRequest_Body{
Shard_ID: getShardIDList(cmd),
IgnoreErrors: ignoreErrors,
Scope: getEvacuationScope(cmd),
ContainerWorkerCount: containerWorkerCount,
ObjectWorkerCount: objectWorkerCount,
RepOneOnly: repOneOnly,
Shard_ID: getShardIDList(cmd),
IgnoreErrors: ignoreErrors,
Scope: getEvacuationScope(cmd),
},
}
@ -381,9 +371,6 @@ func initControlStartEvacuationShardCmd() {
flags.String(scopeFlag, scopeAll, fmt.Sprintf("Evacuation scope; possible values: %s, %s, %s", scopeTrees, scopeObjects, scopeAll))
flags.Bool(awaitFlag, false, "Block execution until evacuation is completed")
flags.Bool(noProgressFlag, false, fmt.Sprintf("Print progress if %s provided", awaitFlag))
flags.Uint32(containerWorkerCountFlag, 0, "Count of concurrent container evacuation workers")
flags.Uint32(objectWorkerCountFlag, 0, "Count of concurrent object evacuation workers")
flags.Bool(repOneOnlyFlag, false, "Evacuate objects only from containers with policy 'REP 1 ...'")
startEvacuationShardCmd.MarkFlagsMutuallyExclusive(shardIDFlag, shardAllFlag)
}

View file

@ -49,14 +49,14 @@ func prettyPrintNodeInfo(cmd *cobra.Command, i netmap.NodeInfo) {
cmd.Println("key:", hex.EncodeToString(i.PublicKey()))
var stateWord string
switch i.Status() {
switch {
default:
stateWord = "<undefined>"
case netmap.Online:
case i.IsOnline():
stateWord = "online"
case netmap.Offline:
case i.IsOffline():
stateWord = "offline"
case netmap.Maintenance:
case i.IsMaintenance():
stateWord = "maintenance"
}

View file

@ -38,6 +38,7 @@ func initObjectHeadCmd() {
_ = objectHeadCmd.MarkFlagRequired(commonflags.OIDFlag)
flags.String(fileFlag, "", "File to write header to. Default: stdout.")
flags.Bool("main-only", false, "Return only main fields")
flags.Bool(commonflags.JSON, false, "Marshal output in JSON")
flags.Bool("proto", false, "Marshal output in Protobuf")
flags.Bool(rawFlag, false, rawFlagDesc)
@ -48,6 +49,7 @@ func getObjectHeader(cmd *cobra.Command, _ []string) {
var obj oid.ID
objAddr := readObjectAddress(cmd, &cnr, &obj)
mainOnly, _ := cmd.Flags().GetBool("main-only")
pk := key.GetOrGenerate(cmd)
cli := internalclient.GetSDKClientByFlag(cmd, pk, commonflags.RPC)
@ -60,6 +62,7 @@ func getObjectHeader(cmd *cobra.Command, _ []string) {
raw, _ := cmd.Flags().GetBool(rawFlag)
prm.SetRawFlag(raw)
prm.SetAddress(objAddr)
prm.SetMainOnlyFlag(mainOnly)
res, err := internalclient.HeadObject(cmd.Context(), prm)
if err != nil {

View file

@ -47,10 +47,9 @@ func add(cmd *cobra.Command, _ []string) {
meta, err := parseMeta(cmd)
commonCmd.ExitOnErr(cmd, "meta data parsing: %w", err)
ctx, cancel := contextWithTimeout(cmd)
defer cancel()
ctx := cmd.Context()
cli, err := _client()
cli, err := _client(ctx)
commonCmd.ExitOnErr(cmd, "failed to create client: %w", err)
rawCID := make([]byte, sha256.Size)

View file

@ -50,10 +50,9 @@ func addByPath(cmd *cobra.Command, _ []string) {
commonCmd.ExitOnErr(cmd, "decode container ID string: %w", err)
tid, _ := cmd.Flags().GetString(treeIDFlagKey)
ctx, cancel := contextWithTimeout(cmd)
defer cancel()
ctx := cmd.Context()
cli, err := _client()
cli, err := _client(ctx)
commonCmd.ExitOnErr(cmd, "failed to create client: %w", err)
rawCID := make([]byte, sha256.Size)

View file

@ -3,14 +3,13 @@ package tree
import (
"context"
"strings"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/common"
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/commonflags"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/tree"
metrics "git.frostfs.info/TrueCloudLab/frostfs-observability/metrics/grpc"
tracing "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing/grpc"
"github.com/spf13/cobra"
"github.com/spf13/viper"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
@ -18,7 +17,7 @@ import (
// _client returns grpc Tree service client. Should be removed
// after making Tree API public.
func _client() (tree.TreeServiceClient, error) {
func _client(ctx context.Context) (tree.TreeServiceClient, error) {
var netAddr network.Address
err := netAddr.FromString(viper.GetString(commonflags.RPC))
if err != nil {
@ -26,6 +25,7 @@ func _client() (tree.TreeServiceClient, error) {
}
opts := []grpc.DialOption{
grpc.WithBlock(),
grpc.WithChainUnaryInterceptor(
metrics.NewUnaryClientInterceptor(),
tracing.NewUnaryClientInteceptor(),
@ -40,14 +40,12 @@ func _client() (tree.TreeServiceClient, error) {
opts = append(opts, grpc.WithTransportCredentials(insecure.NewCredentials()))
}
cc, err := grpc.NewClient(netAddr.URIAddr(), opts...)
// a default connection establishing timeout
const defaultClientConnectTimeout = time.Second * 2
ctx, cancel := context.WithTimeout(ctx, defaultClientConnectTimeout)
cc, err := grpc.DialContext(ctx, netAddr.URIAddr(), opts...)
cancel()
return tree.NewTreeServiceClient(cc), err
}
func contextWithTimeout(cmd *cobra.Command) (context.Context, context.CancelFunc) {
if timeout := viper.GetDuration(commonflags.Timeout); timeout > 0 {
common.PrintVerbose(cmd, "Set request timeout to %s.", timeout)
return context.WithTimeout(cmd.Context(), timeout)
}
return context.WithTimeout(cmd.Context(), commonflags.TimeoutDefault)
}

View file

@ -50,10 +50,9 @@ func getByPath(cmd *cobra.Command, _ []string) {
commonCmd.ExitOnErr(cmd, "decode container ID string: %w", err)
tid, _ := cmd.Flags().GetString(treeIDFlagKey)
ctx, cancel := contextWithTimeout(cmd)
defer cancel()
ctx := cmd.Context()
cli, err := _client()
cli, err := _client(ctx)
commonCmd.ExitOnErr(cmd, "failed to create client: %w", err)
rawCID := make([]byte, sha256.Size)

View file

@ -44,10 +44,9 @@ func getOpLog(cmd *cobra.Command, _ []string) {
commonCmd.ExitOnErr(cmd, "decode container ID string: %w", err)
tid, _ := cmd.Flags().GetString(treeIDFlagKey)
ctx, cancel := contextWithTimeout(cmd)
defer cancel()
ctx := cmd.Context()
cli, err := _client()
cli, err := _client(ctx)
commonCmd.ExitOnErr(cmd, "failed to create client: %w", err)
rawCID := make([]byte, sha256.Size)

View file

@ -26,10 +26,9 @@ func initHealthcheckCmd() {
func healthcheck(cmd *cobra.Command, _ []string) {
pk := key.GetOrGenerate(cmd)
ctx, cancel := contextWithTimeout(cmd)
defer cancel()
ctx := cmd.Context()
cli, err := _client()
cli, err := _client(ctx)
commonCmd.ExitOnErr(cmd, "failed to create client: %w", err)
req := &tree.HealthcheckRequest{

View file

@ -38,10 +38,9 @@ func list(cmd *cobra.Command, _ []string) {
err := cnr.DecodeString(cidString)
commonCmd.ExitOnErr(cmd, "decode container ID string: %w", err)
ctx, cancel := contextWithTimeout(cmd)
defer cancel()
ctx := cmd.Context()
cli, err := _client()
cli, err := _client(ctx)
commonCmd.ExitOnErr(cmd, "failed to create client: %w", err)
rawCID := make([]byte, sha256.Size)

View file

@ -45,10 +45,9 @@ func move(cmd *cobra.Command, _ []string) {
err := cnr.DecodeString(cidString)
commonCmd.ExitOnErr(cmd, "decode container ID string: %w", err)
ctx, cancel := contextWithTimeout(cmd)
defer cancel()
ctx := cmd.Context()
cli, err := _client()
cli, err := _client(ctx)
commonCmd.ExitOnErr(cmd, "failed to create client: %w", err)
rawCID := make([]byte, sha256.Size)

View file

@ -41,10 +41,9 @@ func remove(cmd *cobra.Command, _ []string) {
err := cnr.DecodeString(cidString)
commonCmd.ExitOnErr(cmd, "decode container ID string: %w", err)
ctx, cancel := contextWithTimeout(cmd)
defer cancel()
ctx := cmd.Context()
cli, err := _client()
cli, err := _client(ctx)
commonCmd.ExitOnErr(cmd, "failed to create client: %w", err)
rawCID := make([]byte, sha256.Size)

View file

@ -49,7 +49,6 @@ const (
heightFlagKey = "height"
countFlagKey = "count"
depthFlagKey = "depth"
orderFlagKey = "ordered"
)
func initCTID(cmd *cobra.Command) {

View file

@ -30,7 +30,6 @@ func initGetSubtreeCmd() {
ff := getSubtreeCmd.Flags()
ff.Uint64(rootIDFlagKey, 0, "Root ID to traverse from.")
ff.Uint32(depthFlagKey, 10, "Traversal depth.")
ff.Bool(orderFlagKey, false, "Sort output by ascending FileName.")
_ = getSubtreeCmd.MarkFlagRequired(commonflags.CIDFlag)
_ = getSubtreeCmd.MarkFlagRequired(treeIDFlagKey)
@ -46,10 +45,9 @@ func getSubTree(cmd *cobra.Command, _ []string) {
err := cnr.DecodeString(cidString)
commonCmd.ExitOnErr(cmd, "decode container ID string: %w", err)
ctx, cancel := contextWithTimeout(cmd)
defer cancel()
ctx := cmd.Context()
cli, err := _client()
cli, err := _client(ctx)
commonCmd.ExitOnErr(cmd, "failed to create client: %w", err)
rawCID := make([]byte, sha256.Size)
@ -61,13 +59,6 @@ func getSubTree(cmd *cobra.Command, _ []string) {
depth, _ := cmd.Flags().GetUint32(depthFlagKey)
order, _ := cmd.Flags().GetBool(orderFlagKey)
bodyOrder := tree.GetSubTreeRequest_Body_Order_None
if order {
bodyOrder = tree.GetSubTreeRequest_Body_Order_Asc
}
var bt []byte
if t := common.ReadBearerToken(cmd, bearerFlagKey); t != nil {
bt = t.Marshal()
@ -80,9 +71,6 @@ func getSubTree(cmd *cobra.Command, _ []string) {
RootId: []uint64{rid},
Depth: depth,
BearerToken: bt,
OrderBy: &tree.GetSubTreeRequest_Body_Order{
Direction: bodyOrder,
},
},
}

View file

@ -41,8 +41,6 @@ func reloadConfig() error {
if err != nil {
return err
}
logPrm.PrependTimestamp = cfg.GetBool("logger.timestamp")
return logPrm.Reload()
}

View file

@ -9,7 +9,6 @@ import (
func defaultConfiguration(cfg *viper.Viper) {
cfg.SetDefault("logger.level", "info")
cfg.SetDefault("logger.destination", "stdout")
cfg.SetDefault("logger.timestamp", false)
setPprofDefaults(cfg)

View file

@ -13,7 +13,6 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/misc"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/sdnotify"
"github.com/spf13/viper"
"go.uber.org/zap"
)
@ -79,8 +78,6 @@ func main() {
)
exitErr(err)
logPrm.SamplingHook = metrics.LogMetrics().GetSamplingHook()
logPrm.PrependTimestamp = cfg.GetBool("logger.timestamp")
log, err = logger.NewLogger(logPrm)
exitErr(err)
@ -127,8 +124,4 @@ func shutdown() {
zap.String("error", err.Error()),
)
}
if err := sdnotify.ClearStatus(); err != nil {
log.Error(logs.FailedToReportStatusToSystemd, zap.Error(err))
}
}

View file

@ -7,8 +7,6 @@ import (
)
type RawEntry struct {
// key and value used for record dump.
// nolint:unused
key, value []byte
}

View file

@ -16,8 +16,6 @@ type (
DefaultRecord struct {
addr oid.Address
// data used for record dump.
// nolint:unused
data []byte
}
)

View file

@ -25,7 +25,7 @@ func init() {
func inspectFunc(cmd *cobra.Command, _ []string) {
var data []byte
db, err := writecache.OpenDB(vPath, true, os.OpenFile)
db, err := writecache.OpenDB(vPath, true, os.OpenFile, 0)
common.ExitOnErr(cmd, common.Errf("could not open write-cache db: %w", err))
defer db.Close()

View file

@ -31,7 +31,7 @@ func listFunc(cmd *cobra.Command, _ []string) {
return err
}
db, err := writecache.OpenDB(vPath, true, os.OpenFile)
db, err := writecache.OpenDB(vPath, true, os.OpenFile, 0)
common.ExitOnErr(cmd, common.Errf("could not open write-cache db: %w", err))
defer db.Close()

View file

@ -102,7 +102,6 @@ type applicationConfiguration struct {
LoggerCfg struct {
level string
destination string
timestamp bool
}
EngineCfg struct {
@ -146,12 +145,15 @@ type shardCfg struct {
writecacheCfg struct {
enabled bool
path string
maxBatchSize int
maxBatchDelay time.Duration
smallObjectSize uint64
maxObjSize uint64
flushWorkerCount int
sizeLimit uint64
countLimit uint64
noSync bool
flushSizeLimit uint64
pageSize int
}
piloramaCfg struct {
@ -221,7 +223,6 @@ func (a *applicationConfiguration) readConfig(c *config.Config) error {
a.LoggerCfg.level = loggerconfig.Level(c)
a.LoggerCfg.destination = loggerconfig.Destination(c)
a.LoggerCfg.timestamp = loggerconfig.Timestamp(c)
// Storage Engine
@ -268,12 +269,15 @@ func (a *applicationConfiguration) setShardWriteCacheConfig(newConfig *shardCfg,
wc.enabled = true
wc.path = writeCacheCfg.Path()
wc.maxBatchSize = writeCacheCfg.BoltDB().MaxBatchSize()
wc.maxBatchDelay = writeCacheCfg.BoltDB().MaxBatchDelay()
wc.pageSize = writeCacheCfg.BoltDB().PageSize()
wc.maxObjSize = writeCacheCfg.MaxObjectSize()
wc.smallObjectSize = writeCacheCfg.SmallObjectSize()
wc.flushWorkerCount = writeCacheCfg.WorkerCount()
wc.sizeLimit = writeCacheCfg.SizeLimit()
wc.countLimit = writeCacheCfg.CountLimit()
wc.noSync = writeCacheCfg.NoSync()
wc.flushSizeLimit = writeCacheCfg.MaxFlushingObjectsSize()
}
}
@ -602,6 +606,7 @@ type cfgNetmap struct {
needBootstrap bool
reBoostrapTurnedOff *atomic.Bool // managed by control service in runtime
startEpoch uint64 // epoch number when application is started
}
type cfgNodeInfo struct {
@ -857,8 +862,11 @@ func (c *cfg) getWriteCacheOpts(shCfg shardCfg) []writecache.Option {
if wcRead := shCfg.writecacheCfg; wcRead.enabled {
writeCacheOpts = append(writeCacheOpts,
writecache.WithPath(wcRead.path),
writecache.WithFlushSizeLimit(wcRead.flushSizeLimit),
writecache.WithMaxBatchSize(wcRead.maxBatchSize),
writecache.WithMaxBatchDelay(wcRead.maxBatchDelay),
writecache.WithPageSize(wcRead.pageSize),
writecache.WithMaxObjectSize(wcRead.maxObjSize),
writecache.WithSmallObjectSize(wcRead.smallObjectSize),
writecache.WithFlushWorkersCount(wcRead.flushWorkerCount),
writecache.WithMaxCacheSize(wcRead.sizeLimit),
writecache.WithMaxCacheCount(wcRead.countLimit),
@ -1024,7 +1032,6 @@ func (c *cfg) loggerPrm() (*logger.Prm, error) {
// not expected since validation should be performed before
panic("incorrect log destination format: " + c.LoggerCfg.destination)
}
c.dynamicConfiguration.logger.PrependTimestamp = c.LoggerCfg.timestamp
return c.dynamicConfiguration.logger, nil
}
@ -1081,6 +1088,7 @@ func initAccessPolicyEngine(_ context.Context, c *cfg) {
localOverrideDB = chainbase.NewInmemoryLocalOverrideDatabase()
} else {
localOverrideDB = chainbase.NewBoltLocalOverrideDatabase(
chainbase.WithLogger(c.log),
chainbase.WithPath(nodeconfig.PersistentPolicyRules(c.appCfg).Path()),
chainbase.WithPerm(nodeconfig.PersistentPolicyRules(c.appCfg).Perm()),
chainbase.WithNoSync(nodeconfig.PersistentPolicyRules(c.appCfg).NoSync()),
@ -1180,9 +1188,7 @@ func (c *cfg) bootstrapWithState(stateSetter func(*netmap.NodeInfo)) error {
// bootstrapOnline calls cfg.bootstrapWithState with "online" state.
func bootstrapOnline(c *cfg) error {
return c.bootstrapWithState(func(ni *netmap.NodeInfo) {
ni.SetStatus(netmap.Online)
})
return c.bootstrapWithState((*netmap.NodeInfo).SetOnline)
}
// bootstrap calls bootstrapWithState with:
@ -1193,9 +1199,7 @@ func (c *cfg) bootstrap() error {
st := c.cfgNetmap.state.controlNetmapStatus()
if st == control.NetmapStatus_MAINTENANCE {
c.log.Info(logs.FrostFSNodeBootstrappingWithTheMaintenanceState)
return c.bootstrapWithState(func(ni *netmap.NodeInfo) {
ni.SetStatus(netmap.Maintenance)
})
return c.bootstrapWithState((*netmap.NodeInfo).SetMaintenance)
}
c.log.Info(logs.FrostFSNodeBootstrappingWithOnlineState,
@ -1283,6 +1287,7 @@ func (c *cfg) reloadConfig(ctx context.Context) {
// all the components are expected to support
// Logger's dynamic reconfiguration approach
var components []dCmp
// Logger
@ -1292,7 +1297,34 @@ func (c *cfg) reloadConfig(ctx context.Context) {
return
}
components := c.getComponents(ctx, logPrm)
components = append(components, dCmp{"logger", logPrm.Reload})
components = append(components, dCmp{"runtime", func() error {
setRuntimeParameters(c)
return nil
}})
components = append(components, dCmp{"audit", func() error {
c.audit.Store(audit.Enabled(c.appCfg))
return nil
}})
components = append(components, dCmp{"pools", c.reloadPools})
components = append(components, dCmp{"tracing", func() error {
updated, err := tracing.Setup(ctx, *tracingconfig.ToTracingConfig(c.appCfg))
if updated {
c.log.Info(logs.FrostFSNodeTracingConfigationUpdated)
}
return err
}})
if cmp, updated := metricsComponent(c); updated {
if cmp.enabled {
cmp.preReload = enableMetricsSvc
} else {
cmp.preReload = disableMetricsSvc
}
components = append(components, dCmp{cmp.name, func() error { return cmp.reload(ctx) }})
}
if cmp, updated := pprofComponent(c); updated {
components = append(components, dCmp{cmp.name, func() error { return cmp.reload(ctx) }})
}
// Storage Engine
@ -1319,45 +1351,6 @@ func (c *cfg) reloadConfig(ctx context.Context) {
c.log.Info(logs.FrostFSNodeConfigurationHasBeenReloadedSuccessfully)
}
func (c *cfg) getComponents(ctx context.Context, logPrm *logger.Prm) []dCmp {
var components []dCmp
components = append(components, dCmp{"logger", logPrm.Reload})
components = append(components, dCmp{"runtime", func() error {
setRuntimeParameters(c)
return nil
}})
components = append(components, dCmp{"audit", func() error {
c.audit.Store(audit.Enabled(c.appCfg))
return nil
}})
components = append(components, dCmp{"pools", c.reloadPools})
components = append(components, dCmp{"tracing", func() error {
traceConfig, err := tracingconfig.ToTracingConfig(c.appCfg)
if err != nil {
return err
}
updated, err := tracing.Setup(ctx, *traceConfig)
if updated {
c.log.Info(logs.FrostFSNodeTracingConfigationUpdated)
}
return err
}})
if cmp, updated := metricsComponent(c); updated {
if cmp.enabled {
cmp.preReload = enableMetricsSvc
} else {
cmp.preReload = disableMetricsSvc
}
components = append(components, dCmp{cmp.name, func() error { return cmp.reload(ctx) }})
}
if cmp, updated := pprofComponent(c); updated {
components = append(components, dCmp{cmp.name, func() error { return cmp.reload(ctx) }})
}
return components
}
func (c *cfg) reloadPools() error {
newSize := objectconfig.Put(c.appCfg).PoolSizeLocal()
c.reloadPool(c.cfgObject.pool.putLocal, newSize, "object.put.local_pool_size")
@ -1414,8 +1407,4 @@ func (c *cfg) shutdown() {
for i := range c.closers {
c.closers[len(c.closers)-1-i].fn()
}
if err := sdnotify.ClearStatus(); err != nil {
c.log.Error(logs.FailedToReportStatusToSystemd, zap.Error(err))
}
}

View file

@ -73,11 +73,12 @@ func TestEngineSection(t *testing.T) {
require.Equal(t, true, wc.NoSync())
require.Equal(t, "tmp/0/cache", wc.Path())
require.EqualValues(t, 16384, wc.SmallObjectSize())
require.EqualValues(t, 134217728, wc.MaxObjectSize())
require.EqualValues(t, 30, wc.WorkerCount())
require.EqualValues(t, 3221225472, wc.SizeLimit())
require.EqualValues(t, 4096, wc.BoltDB().PageSize())
require.EqualValues(t, 49, wc.CountLimit())
require.EqualValues(t, uint64(100), wc.MaxFlushingObjectsSize())
require.Equal(t, "tmp/0/meta", meta.Path())
require.Equal(t, fs.FileMode(0o644), meta.BoltDB().Perm())
@ -129,11 +130,12 @@ func TestEngineSection(t *testing.T) {
require.Equal(t, false, wc.NoSync())
require.Equal(t, "tmp/1/cache", wc.Path())
require.EqualValues(t, 16384, wc.SmallObjectSize())
require.EqualValues(t, 134217728, wc.MaxObjectSize())
require.EqualValues(t, 30, wc.WorkerCount())
require.EqualValues(t, 4294967296, wc.SizeLimit())
require.EqualValues(t, 0, wc.BoltDB().PageSize())
require.EqualValues(t, writecacheconfig.CountLimitDefault, wc.CountLimit())
require.EqualValues(t, writecacheconfig.MaxFlushingObjectsSizeDefault, wc.MaxFlushingObjectsSize())
require.Equal(t, "tmp/1/meta", meta.Path())
require.Equal(t, fs.FileMode(0o644), meta.BoltDB().Perm())

View file

@ -2,6 +2,7 @@ package writecacheconfig
import (
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config"
boltdbconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine/shard/boltdb"
)
// Config is a wrapper over the config section
@ -9,6 +10,9 @@ import (
type Config config.Config
const (
// SmallSizeDefault is a default size of small objects.
SmallSizeDefault = 32 << 10
// MaxSizeDefault is a default value of the object payload size limit.
MaxSizeDefault = 64 << 20
@ -20,8 +24,6 @@ const (
// CountLimitDefault is a default write-cache count limit.
CountLimitDefault = 0
MaxFlushingObjectsSizeDefault = 128 << 20
)
// From wraps config section into Config.
@ -52,6 +54,22 @@ func (x *Config) Path() string {
return p
}
// SmallObjectSize returns the value of "small_object_size" config parameter.
//
// Returns SmallSizeDefault if the value is not a positive number.
func (x *Config) SmallObjectSize() uint64 {
s := config.SizeInBytesSafe(
(*config.Config)(x),
"small_object_size",
)
if s > 0 {
return s
}
return SmallSizeDefault
}
// MaxObjectSize returns the value of "max_object_size" config parameter.
//
// Returns MaxSizeDefault if the value is not a positive number.
@ -123,18 +141,7 @@ func (x *Config) NoSync() bool {
return config.BoolSafe((*config.Config)(x), "no_sync")
}
// MaxFlushingObjectsSize returns the value of "max_flushing_objects_size" config parameter.
//
// Returns MaxFlushingObjectsSizeDefault if the value is not a positive number.
func (x *Config) MaxFlushingObjectsSize() uint64 {
s := config.SizeInBytesSafe(
(*config.Config)(x),
"max_flushing_objects_size",
)
if s > 0 {
return s
}
return MaxFlushingObjectsSizeDefault
// BoltDB returns config instance for querying bolt db specific parameters.
func (x *Config) BoltDB() *boltdbconfig.Config {
return (*boltdbconfig.Config)(x)
}

View file

@ -52,14 +52,6 @@ func Destination(c *config.Config) string {
return DestinationDefault
}
// Timestamp returns the value of "timestamp" config parameter
// from "logger" section.
//
// Returns false if the value isn't specified.
func Timestamp(c *config.Config) bool {
return config.BoolSafe(c.Sub(subsection), "timestamp")
}
// ToLokiConfig extracts loki config.
func ToLokiConfig(c *config.Config) loki.Config {
hostname, _ := os.Hostname()

View file

@ -13,7 +13,6 @@ func TestLoggerSection_Level(t *testing.T) {
t.Run("defaults", func(t *testing.T) {
require.Equal(t, loggerconfig.LevelDefault, loggerconfig.Level(configtest.EmptyConfig()))
require.Equal(t, loggerconfig.DestinationDefault, loggerconfig.Destination(configtest.EmptyConfig()))
require.Equal(t, false, loggerconfig.Timestamp(configtest.EmptyConfig()))
})
const path = "../../../../config/example/node"
@ -21,7 +20,6 @@ func TestLoggerSection_Level(t *testing.T) {
fileConfigTest := func(c *config.Config) {
require.Equal(t, "debug", loggerconfig.Level(c))
require.Equal(t, "journald", loggerconfig.Destination(c))
require.Equal(t, true, loggerconfig.Timestamp(c))
}
configtest.ForEachFileType(path, fileConfigTest)

View file

@ -1,11 +1,6 @@
package tracing
import (
"crypto/x509"
"errors"
"fmt"
"os"
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config"
"git.frostfs.info/TrueCloudLab/frostfs-node/misc"
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
@ -16,8 +11,8 @@ const (
)
// ToTracingConfig extracts tracing config.
func ToTracingConfig(c *config.Config) (*tracing.Config, error) {
conf := &tracing.Config{
func ToTracingConfig(c *config.Config) *tracing.Config {
return &tracing.Config{
Enabled: config.BoolSafe(c.Sub(subsection), "enabled"),
Exporter: tracing.Exporter(config.StringSafe(c.Sub(subsection), "exporter")),
Endpoint: config.StringSafe(c.Sub(subsection), "endpoint"),
@ -25,20 +20,6 @@ func ToTracingConfig(c *config.Config) (*tracing.Config, error) {
InstanceID: getInstanceIDOrDefault(c),
Version: misc.Version,
}
if trustedCa := config.StringSafe(c.Sub(subsection), "trusted_ca"); trustedCa != "" {
caBytes, err := os.ReadFile(trustedCa)
if err != nil {
return nil, fmt.Errorf("cannot read trusted ca cert by path: %w", err)
}
certPool := x509.NewCertPool()
ok := certPool.AppendCertsFromPEM(caBytes)
if !ok {
return nil, errors.New("can't fill cert pool by ca cert")
}
conf.ServerCaCertPool = certPool
}
return conf, nil
}
func getInstanceIDOrDefault(c *config.Config) string {

View file

@ -128,6 +128,9 @@ func configureEACLAndContainerSources(c *cfg, client *cntClient.Client, cnrSrc c
cnrRdr.lister = client
cnrRdr.eacl = c.cfgObject.eaclSource
cnrRdr.src = c.cfgObject.cnrSource
cnrWrt.cacheEnabled = true
cnrWrt.eacls = cachedEACLStorage
}
return cnrRdr, cnrWrt
@ -244,6 +247,9 @@ func (x *morphContainerReader) ContainersOf(id *user.ID) ([]cid.ID, error) {
type morphContainerWriter struct {
neoClient *cntClient.Client
cacheEnabled bool
eacls ttlEACLStorage
}
func (m morphContainerWriter) Put(cnr containerCore.Container) (*cid.ID, error) {

View file

@ -61,15 +61,13 @@ func (s *networkState) setNodeInfo(ni *netmapSDK.NodeInfo) {
if ni != nil {
s.nodeInfo.Store(*ni)
switch ni.Status() {
case netmapSDK.Online:
switch {
case ni.IsOnline():
ctrlNetSt = control.NetmapStatus_ONLINE
case netmapSDK.Offline:
case ni.IsOffline():
ctrlNetSt = control.NetmapStatus_OFFLINE
case netmapSDK.Maintenance:
case ni.IsMaintenance():
ctrlNetSt = control.NetmapStatus_MAINTENANCE
case netmapSDK.UnspecifiedState:
ctrlNetSt = control.NetmapStatus_STATUS_UNDEFINED
}
} else {
ctrlNetSt = control.NetmapStatus_OFFLINE
@ -80,7 +78,7 @@ func (s *networkState) setNodeInfo(ni *netmapSDK.NodeInfo) {
// nil ni means that the node is not included
// in the netmap
niOld.SetStatus(netmapSDK.Offline)
niOld.SetOffline()
s.nodeInfo.Store(niOld)
}
@ -141,7 +139,7 @@ func initNetmapService(ctx context.Context, c *cfg) {
network.WriteToNodeInfo(c.localAddr, &c.cfgNodeInfo.localInfo)
c.cfgNodeInfo.localInfo.SetPublicKey(c.key.PublicKey().Bytes())
parseAttributes(c)
c.cfgNodeInfo.localInfo.SetStatus(netmapSDK.Offline)
c.cfgNodeInfo.localInfo.SetOffline()
if c.cfgMorph.client == nil {
initMorphComponents(ctx, c)
@ -254,25 +252,24 @@ func initNetmapState(c *cfg) {
zap.String("state", stateWord),
)
if ni != nil && ni.Status().IsMaintenance() {
if ni != nil && ni.IsMaintenance() {
c.isMaintenance.Store(true)
}
c.cfgNetmap.state.setCurrentEpoch(epoch)
c.cfgNetmap.startEpoch = epoch
c.setContractNodeInfo(ni)
}
func nodeState(ni *netmapSDK.NodeInfo) string {
if ni != nil {
switch ni.Status() {
case netmapSDK.Online:
switch {
case ni.IsOnline():
return "online"
case netmapSDK.Offline:
case ni.IsOffline():
return "offline"
case netmapSDK.Maintenance:
case ni.IsMaintenance():
return "maintenance"
case netmapSDK.UnspecifiedState:
return "undefined"
}
}
return "undefined"

View file

@ -473,6 +473,7 @@ func createACLServiceV2(c *cfg, apeSvc *objectAPE.Service, irFetcher *cachedIRFe
func createAPEService(c *cfg, splitSvc *objectService.TransportSplitter) *objectAPE.Service {
return objectAPE.NewService(
c.log,
objectAPE.NewChecker(
c.cfgObject.cfgAccessPolicyEngine.accessPolicyEngine.LocalStorage(),
c.cfgObject.cfgAccessPolicyEngine.accessPolicyEngine.MorphRuleChainStorage(),

View file

@ -21,9 +21,7 @@ type accessPolicyEngine struct {
var _ engine.MorphRuleChainStorageReader = (*morphAPEChainCache)(nil)
type morphAPEChainCacheKey struct {
// nolint:unused
name chain.Name
// nolint:unused
name chain.Name
target engine.Target
}

View file

@ -11,15 +11,11 @@ import (
)
func initTracing(ctx context.Context, c *cfg) {
conf, err := tracingconfig.ToTracingConfig(c.appCfg)
conf := tracingconfig.ToTracingConfig(c.appCfg)
_, err := tracing.Setup(ctx, *conf)
if err != nil {
c.log.Error(logs.FrostFSNodeFailedInitTracing, zap.Error(err))
return
}
_, err = tracing.Setup(ctx, *conf)
if err != nil {
c.log.Error(logs.FrostFSNodeFailedInitTracing, zap.Error(err))
return
}
c.closers = append(c.closers, closer{

View file

@ -1,107 +0,0 @@
package config_test
import (
"encoding/json"
"os"
"path"
"testing"
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/internal/common/config"
configtest "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/config/test"
"github.com/spf13/viper"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"gopkg.in/yaml.v3"
)
func TestCreateReloadViper(t *testing.T) {
type m = map[string]any
dummyFileSize := 1 << 10
configPath := t.TempDir()
configFile := "000_a.yaml"
configDirPath := path.Join(configPath, "conf.d")
require.NoError(t, os.Mkdir(configDirPath, 0o700))
configtest.PrepareConfigFiles(t, configPath, []configtest.ConfigFile{
configtest.NewConfigFile(configFile, m{"a": "000"}, yaml.Marshal),
})
// Not valid configs, dummy files those appear lexicographically first.
configtest.PrepareDummyFiles(t, configDirPath, []configtest.DummyFile{
configtest.NewDummyFile("000_file_1", dummyFileSize),
configtest.NewDummyFile("000_file_2", dummyFileSize),
configtest.NewDummyFile("000_file_3", dummyFileSize),
})
configtest.PrepareConfigFiles(t, configDirPath, []configtest.ConfigFile{
// Valid configs with invalid extensions those appear lexicographically first.
configtest.NewConfigFile("001_a.yaml.un~", m{"a": "101"}, yaml.Marshal),
configtest.NewConfigFile("001_b.yml~", m{"b": m{"a": "102", "b": "103"}}, yaml.Marshal),
configtest.NewConfigFile("001_c.yaml.swp", m{"c": m{"a": "104", "b": "105"}}, yaml.Marshal),
configtest.NewConfigFile("001_d.json.swp", m{"d": m{"a": "106", "b": "107"}}, json.Marshal),
// Valid configs with valid extensions those should be loaded.
configtest.NewConfigFile("010_a.yaml", m{"a": "1"}, yaml.Marshal),
configtest.NewConfigFile("020_b.yml", m{"b": m{"a": "2", "b": "3"}}, yaml.Marshal),
configtest.NewConfigFile("030_c.json", m{"c": m{"a": "4", "b": "5"}}, json.Marshal),
// Valid configs with invalid extensions those appear lexicographically last.
configtest.NewConfigFile("099_a.yaml.un~", m{"a": "201"}, yaml.Marshal),
configtest.NewConfigFile("099_b.yml~", m{"b": m{"a": "202", "b": "203"}}, yaml.Marshal),
configtest.NewConfigFile("099_c.yaml.swp", m{"c": m{"a": "204", "b": "205"}}, yaml.Marshal),
configtest.NewConfigFile("099_c.json.swp", m{"d": m{"a": "206", "b": "207"}}, json.Marshal),
})
// Not valid configs, dummy files those appear lexicographically last.
configtest.PrepareDummyFiles(t, configDirPath, []configtest.DummyFile{
configtest.NewDummyFile("999_file_1", dummyFileSize),
configtest.NewDummyFile("999_file_2", dummyFileSize),
configtest.NewDummyFile("999_file_3", dummyFileSize),
})
finalConfig := m{"a": "1", "b": m{"a": "2", "b": "3"}, "c": m{"a": "4", "b": "5"}}
var (
v *viper.Viper
err error
)
t.Run("create config with config dir only", func(t *testing.T) {
v, err = config.CreateViper(
config.WithConfigDir(configDirPath),
)
require.NoError(t, err)
assert.Equal(t, finalConfig, v.AllSettings())
})
t.Run("reload config with config dir only", func(t *testing.T) {
err = config.ReloadViper(
config.WithViper(v),
config.WithConfigDir(configDirPath),
)
require.NoError(t, err)
assert.Equal(t, finalConfig, v.AllSettings())
})
t.Run("create config with both config and config dir", func(t *testing.T) {
v, err = config.CreateViper(
config.WithConfigFile(path.Join(configPath, configFile)),
config.WithConfigDir(configDirPath),
)
require.NoError(t, err)
assert.Equal(t, finalConfig, v.AllSettings())
})
t.Run("reload config with both config and config dir", func(t *testing.T) {
err = config.ReloadViper(
config.WithViper(v),
config.WithConfigFile(path.Join(configPath, configFile)),
config.WithConfigDir(configDirPath),
)
require.NoError(t, err)
assert.Equal(t, finalConfig, v.AllSettings())
})
}

View file

@ -14,14 +14,14 @@ func PrettyPrintNodeInfo(cmd *cobra.Command, node netmap.NodeInfo,
) {
var strState string
switch node.Status() {
switch {
default:
strState = "STATE_UNSUPPORTED"
case netmap.Online:
case node.IsOnline():
strState = "ONLINE"
case netmap.Offline:
case node.IsOffline():
strState = "OFFLINE"
case netmap.Maintenance:
case node.IsMaintenance():
strState = "MAINTENANCE"
}

View file

@ -1,5 +1,4 @@
FROSTFS_IR_LOGGER_LEVEL=info
FROSTFS_IR_LOGGER_TIMESTAMP=true
FROSTFS_IR_WALLET_PATH=/path/to/wallet.json
FROSTFS_IR_WALLET_ADDRESS=NUHtW3eM6a4mmFCgyyr4rj4wygsTKB88XX

View file

@ -2,7 +2,6 @@
logger:
level: info # Logger level: one of "debug", "info" (default), "warn", "error", "dpanic", "panic", "fatal"
timestamp: true
wallet:
path: /path/to/wallet.json # Path to NEP-6 NEO wallet file

View file

@ -1,6 +1,5 @@
FROSTFS_LOGGER_LEVEL=debug
FROSTFS_LOGGER_DESTINATION=journald
FROSTFS_LOGGER_TIMESTAMP=true
FROSTFS_PPROF_ENABLED=true
FROSTFS_PPROF_ADDRESS=localhost:6060
@ -107,7 +106,6 @@ FROSTFS_STORAGE_SHARD_0_WRITECACHE_FLUSH_WORKER_COUNT=30
FROSTFS_STORAGE_SHARD_0_WRITECACHE_CAPACITY=3221225472
FROSTFS_STORAGE_SHARD_0_WRITECACHE_PAGE_SIZE=4096
FROSTFS_STORAGE_SHARD_0_WRITECACHE_MAX_OBJECT_COUNT=49
FROSTFS_STORAGE_SHARD_0_WRITECACHE_MAX_FLUSHING_OBJECTS_SIZE=100
### Metabase config
FROSTFS_STORAGE_SHARD_0_METABASE_PATH=tmp/0/meta
FROSTFS_STORAGE_SHARD_0_METABASE_PERM=0644
@ -200,7 +198,6 @@ FROSTFS_STORAGE_SHARD_1_GC_REMOVER_SLEEP_INTERVAL=5m
FROSTFS_TRACING_ENABLED=true
FROSTFS_TRACING_ENDPOINT="localhost"
FROSTFS_TRACING_EXPORTER="otlp_grpc"
FROSTFS_TRACING_TRUSTED_CA=""
FROSTFS_RUNTIME_SOFT_MEMORY_LIMIT=1073741824

View file

@ -1,8 +1,7 @@
{
"logger": {
"level": "debug",
"destination": "journald",
"timestamp": true
"destination": "journald"
},
"pprof": {
"enabled": true,
@ -150,8 +149,7 @@
"flush_worker_count": 30,
"capacity": 3221225472,
"page_size": 4096,
"max_object_count": 49,
"max_flushing_objects_size": 100
"max_object_count": 49
},
"metabase": {
"path": "tmp/0/meta",
@ -256,8 +254,7 @@
"tracing": {
"enabled": true,
"endpoint": "localhost:9090",
"exporter": "otlp_grpc",
"trusted_ca": "/etc/ssl/tracing.pem"
"exporter": "otlp_grpc"
},
"runtime": {
"soft_memory_limit": 1073741824

View file

@ -1,7 +1,6 @@
logger:
level: debug # logger level: one of "debug", "info" (default), "warn", "error", "dpanic", "panic", "fatal"
destination: journald # logger destination: one of "stdout" (default), "journald"
timestamp: true
systemdnotify:
enabled: true
@ -173,7 +172,6 @@ storage:
capacity: 3221225472 # approximate write-cache total size, bytes
max_object_count: 49
page_size: 4k
max_flushing_objects_size: 100b
metabase:
path: tmp/0/meta # metabase path
@ -232,7 +230,6 @@ tracing:
enabled: true
exporter: "otlp_grpc"
endpoint: "localhost"
trusted_ca: ""
runtime:
soft_memory_limit: 1gb

View file

@ -67,7 +67,7 @@ NEOFS_NODE_ATTRIBUTE_2=UN-LOCODE:RU LED
```
You can validate UN/LOCODE attribute in
[NeoFS LOCODE database](https://git.frostfs.info/TrueCloudLab/frostfs-locode-db/releases/tag/v0.4.0)
[NeoFS LOCODE database](https://github.com/TrueCloudLab/frostfs-locode-db/releases/tag/v0.1.0)
with frostfs-cli.
```

View file

@ -3,7 +3,7 @@
version: "2.4"
services:
neo-go:
image: nspccdev/neo-go:0.106.0
image: nspccdev/neo-go:0.105.0
container_name: neo-go
command: ["node", "--config-path", "/config", "--privnet", "--debug"]
stop_signal: SIGKILL

View file

@ -20,12 +20,7 @@ Because it is necessary to prevent removing by policer objects with policy `REP
## Commands
`frostfs-cli control shards evacuation start` starts evacuation process for shards specified. To start evacuating all node shards, use the `--all` flag.
By default, objects and trees are evacuated. To limit the evacuation scope, use `--scope` flag (possible values are `all`, `trees`, `objects`).
To evacuate objects only from containers with policy `REP 1` use option `--rep-one-only`.
To adjust resource consumption required for evacuation use options:
- `--container-worker-count` count of concurrent container evacuation workers
- `--object-worker-count` count of concurrent object evacuation workers
`frostfs-cli control shards evacuation start` starts evacuation process for shards specified. To start evacuating all node shards, use the `--all` flag. By default, objects and trees are evacuated. To limit the evacuation scope, use `--scope` flag (possible values are `all`, `trees`, `objects`).
`frostfs-cli control shards evacuation stop` stops running evacuation process.

View file

@ -9,7 +9,7 @@ These should run successfully:
* `make lint` (should not change any files);
* `make fmts` (should not change any files);
* `go mod tidy` (should not change any files);
* integration tests in [frostfs-devenv](https://git.frostfs.info/TrueCloudLab/frostfs-dev-env).
* integration tests in [frostfs-devenv](https://github.com/TrueCloudLab/frostfs-devenv).
## Make release commit
@ -123,12 +123,12 @@ the release. Publish the release.
### Update FrostFS Developer Environment
Prepare pull-request in [frostfs-devenv](https://git.frostfs.info/TrueCloudLab/frostfs-dev-env)
Prepare pull-request in [frostfs-devenv](https://github.com/TrueCloudLab/frostfs-devenv)
with new versions.
### Close GitHub milestone
Look up [milestones](https://git.frostfs.info/TrueCloudLab/frostfs-node/milestones) and close the release one if exists.
Look up GitHub [milestones](https://github.com/TrueCloudLab/frostfs-node/milestones) and close the release one if exists.
### Rebuild FrostFS LOCODE database

View file

@ -287,18 +287,23 @@ writecache:
enabled: true
path: /path/to/writecache
capacity: 4294967296
small_object_size: 16384
max_object_size: 134217728
flush_worker_count: 30
page_size: '4k'
```
| Parameter | Type | Default value | Description |
| --------------------------- | ---------- | ------------- | ----------------------------------------------------------------------------------------------------------------------------- |
| `path` | `string` | | Path to the metabase file. |
| `capacity` | `size` | `1G` | Approximate maximum size of the writecache. If the writecache is full, objects are written to the blobstor directly. |
| `max_object_count` | `int` | unrestricted | Approximate maximum objects count in the writecache. If the writecache is full, objects are written to the blobstor directly. |
| `max_object_size` | `size` | `64M` | Maximum object size allowed to be stored in the writecache. |
| `flush_worker_count` | `int` | `20` | Amount of background workers that move data from the writecache to the blobstor. |
| `max_flushing_objects_size` | `size` | `512M` | Max total size of background flushing objects. |
| Parameter | Type | Default value | Description |
|----------------------|------------|---------------|-------------------------------------------------------------------------------------------------------------------------------|
| `path` | `string` | | Path to the metabase file. |
| `capacity` | `size` | `1G` | Approximate maximum size of the writecache. If the writecache is full, objects are written to the blobstor directly. |
| `max_object_count` | `int` | unrestricted | Approximate maximum objects count in the writecache. If the writecache is full, objects are written to the blobstor directly. |
| `small_object_size` | `size` | `32K` | Maximum object size for "small" objects. This objects are stored in a key-value database instead of a file-system. |
| `max_object_size` | `size` | `64M` | Maximum object size allowed to be stored in the writecache. |
| `flush_worker_count` | `int` | `20` | Amount of background workers that move data from the writecache to the blobstor. |
| `max_batch_size` | `int` | `1000` | Maximum amount of small object `PUT` operations to perform in a single transaction. |
| `max_batch_delay` | `duration` | `10ms` | Maximum delay before a batch starts. |
| `page_size` | `size` | `0` | Page size overrides the default OS page size for small objects storage. Does not affect the existing storage. |
# `node` section

48
go.mod
View file

@ -4,12 +4,12 @@ go 1.22
require (
code.gitea.io/sdk/gitea v0.17.1
git.frostfs.info/TrueCloudLab/frostfs-api-go/v2 v2.16.1-0.20240916093537-13fa0da3741e
git.frostfs.info/TrueCloudLab/frostfs-contract v0.19.3-0.20240909072709-3e221b973a3c
git.frostfs.info/TrueCloudLab/frostfs-api-go/v2 v2.16.1-0.20240902111049-c11f50efeccb
git.frostfs.info/TrueCloudLab/frostfs-contract v0.19.3-0.20240621131249-49e5270f673e
git.frostfs.info/TrueCloudLab/frostfs-crypto v0.6.0
git.frostfs.info/TrueCloudLab/frostfs-locode-db v0.4.1-0.20240710074952-65761deb5c0d
git.frostfs.info/TrueCloudLab/frostfs-observability v0.0.0-20240909114314-666d326cc573
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20240918095938-e580ee991d98
git.frostfs.info/TrueCloudLab/frostfs-observability v0.0.0-20231101111734-b3ad3335ff65
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20240903093628-8f751d9dd0ad
git.frostfs.info/TrueCloudLab/hrw v1.2.1
git.frostfs.info/TrueCloudLab/policy-engine v0.0.0-20240814080254-96225afacb88
git.frostfs.info/TrueCloudLab/tzhash v1.8.0
@ -28,7 +28,7 @@ require (
github.com/mitchellh/go-homedir v1.1.0
github.com/mr-tron/base58 v1.2.0
github.com/multiformats/go-multiaddr v0.12.1
github.com/nspcc-dev/neo-go v0.106.3
github.com/nspcc-dev/neo-go v0.106.2
github.com/olekukonko/tablewriter v0.0.5
github.com/panjf2000/ants/v2 v2.9.0
github.com/prometheus/client_golang v1.19.0
@ -40,15 +40,15 @@ require (
github.com/ssgreg/journald v1.0.0
github.com/stretchr/testify v1.9.0
go.etcd.io/bbolt v1.3.10
go.opentelemetry.io/otel v1.28.0
go.opentelemetry.io/otel/trace v1.28.0
go.opentelemetry.io/otel v1.24.0
go.opentelemetry.io/otel/trace v1.24.0
go.uber.org/zap v1.27.0
golang.org/x/exp v0.0.0-20240719175910-8a7402abbf56
golang.org/x/sync v0.7.0
golang.org/x/sys v0.22.0
golang.org/x/term v0.21.0
google.golang.org/grpc v1.66.2
google.golang.org/protobuf v1.34.2
golang.org/x/term v0.18.0
google.golang.org/grpc v1.63.2
google.golang.org/protobuf v1.33.0
gopkg.in/yaml.v3 v3.0.1
)
@ -63,7 +63,7 @@ require (
github.com/antlr4-go/antlr/v4 v4.13.0 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/bits-and-blooms/bitset v1.13.0 // indirect
github.com/cenkalti/backoff/v4 v4.3.0 // indirect
github.com/cenkalti/backoff/v4 v4.2.1 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/consensys/bavard v0.1.13 // indirect
github.com/consensys/gnark-crypto v0.12.2-0.20231222162921-eb75782795d2 // indirect
@ -73,13 +73,13 @@ require (
github.com/fsnotify/fsnotify v1.7.0 // indirect
github.com/gdamore/encoding v1.0.0 // indirect
github.com/go-fed/httpsig v1.1.0 // indirect
github.com/go-logr/logr v1.4.2 // indirect
github.com/go-logr/logr v1.4.1 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/gorilla/websocket v1.5.1 // indirect
github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus v1.0.0 // indirect
github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.0.1 // indirect
github.com/grpc-ecosystem/grpc-gateway/v2 v2.20.0 // indirect
github.com/grpc-ecosystem/grpc-gateway/v2 v2.19.0 // indirect
github.com/hashicorp/go-version v1.6.0 // indirect
github.com/hashicorp/hcl v1.0.0 // indirect
github.com/holiman/uint256 v1.2.4 // indirect
@ -100,7 +100,7 @@ require (
github.com/multiformats/go-multihash v0.2.3 // indirect
github.com/multiformats/go-varint v0.0.7 // indirect
github.com/nspcc-dev/go-ordered-json v0.0.0-20240301084351-0246b013f8b2 // indirect
github.com/nspcc-dev/neo-go/pkg/interop v0.0.0-20240727093519-1a48f1ce43ec // indirect
github.com/nspcc-dev/neo-go/pkg/interop v0.0.0-20240521091047-78685785716d // indirect
github.com/nspcc-dev/rfc6979 v0.2.1 // indirect
github.com/pelletier/go-toml/v2 v2.2.2 // indirect
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
@ -115,18 +115,18 @@ require (
github.com/syndtr/goleveldb v1.0.1-0.20220721030215-126854af5e6d // indirect
github.com/twmb/murmur3 v1.1.8 // indirect
github.com/urfave/cli v1.22.14 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.28.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.28.0 // indirect
go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.28.0 // indirect
go.opentelemetry.io/otel/metric v1.28.0 // indirect
go.opentelemetry.io/otel/sdk v1.28.0 // indirect
go.opentelemetry.io/proto/otlp v1.3.1 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.22.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.22.0 // indirect
go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.22.0 // indirect
go.opentelemetry.io/otel/metric v1.24.0 // indirect
go.opentelemetry.io/otel/sdk v1.22.0 // indirect
go.opentelemetry.io/proto/otlp v1.1.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/crypto v0.24.0 // indirect
golang.org/x/net v0.26.0 // indirect
golang.org/x/crypto v0.21.0 // indirect
golang.org/x/net v0.23.0 // indirect
golang.org/x/text v0.16.0 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20240701130421-f6361c86f094 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240701130421-f6361c86f094 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20240318140521-94a12d6c2237 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240318140521-94a12d6c2237 // indirect
gopkg.in/ini.v1 v1.67.0 // indirect
lukechampine.com/blake3 v1.2.1 // indirect
rsc.io/tmplfunc v0.0.3 // indirect

BIN
go.sum

Binary file not shown.

View file

@ -542,7 +542,6 @@ const (
StartedWritecacheSealAsync = "started writecache seal async"
WritecacheSealCompletedAsync = "writecache seal completed successfully"
FailedToSealWritecacheAsync = "failed to seal writecache async"
WritecacheShrinkSkippedNotEmpty = "writecache shrink skipped: not empty"
WritecacheShrinkSkippedNotEmpty = "writecache shrink skipped: database is not empty"
BlobovniczatreeFailedToRemoveRebuildTempFile = "failed to remove rebuild temp file"
WritecacheCantGetObject = "can't get an object from fstree"
)

View file

@ -17,8 +17,7 @@ type InnerRingServiceMetrics struct {
eventDuration *prometheus.HistogramVec
morphCacheMetrics *morphCacheMetrics
logMetrics logger.LogMetrics
// nolint: unused
appInfo *ApplicationInfo
appInfo *ApplicationInfo
}
// NewInnerRingMetrics returns new instance of metrics collectors for inner ring.

View file

@ -25,8 +25,7 @@ type NodeMetrics struct {
morphClient *morphClientMetrics
morphCache *morphCacheMetrics
log logger.LogMetrics
// nolint: unused
appInfo *ApplicationInfo
appInfo *ApplicationInfo
}
func NewNodeMetrics() *NodeMetrics {

View file

@ -5,7 +5,9 @@ import (
"os"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
"go.etcd.io/bbolt"
"go.uber.org/zap"
)
type Option func(*cfg)
@ -16,6 +18,7 @@ type cfg struct {
noSync bool
maxBatchDelay time.Duration
maxBatchSize int
log *logger.Logger
}
func defaultCfg() *cfg {
@ -23,6 +26,7 @@ func defaultCfg() *cfg {
perm: os.ModePerm,
maxBatchDelay: bbolt.DefaultMaxBatchDelay,
maxBatchSize: bbolt.DefaultMaxBatchSize,
log: &logger.Logger{Logger: zap.L()},
}
}
@ -55,3 +59,9 @@ func WithMaxBatchSize(maxBatchSize int) Option {
c.maxBatchSize = maxBatchSize
}
}
func WithLogger(l *logger.Logger) Option {
return func(c *cfg) {
c.log = l
}
}

View file

@ -361,7 +361,7 @@ func (v *FormatValidator) checkIfExpired(ctx context.Context, obj *objectSDK.Obj
func expirationEpochAttribute(obj *objectSDK.Object) (uint64, error) {
for _, a := range obj.Attributes() {
if a.Key() != objectV2.SysAttributeExpEpoch {
if a.Key() != objectV2.SysAttributeExpEpoch && a.Key() != objectV2.SysAttributeExpEpochNeoFS {
continue
}

View file

@ -163,6 +163,7 @@ func (s *Server) createAlphaSync(cfg *viper.Viper, frostfsCli *frostfsClient.Cli
Log: s.log,
Metrics: s.irMetrics,
FrostFSClient: frostfsCli,
NetmapClient: s.netmapClient,
AlphabetState: s,
EpochState: s,
Voter: s,

View file

@ -103,8 +103,6 @@ type (
// to the application.
runners []func(chan<- error) error
// cmode used for upgrade scenario.
// nolint:unused
cmode *atomic.Bool
}

View file

@ -8,6 +8,7 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client"
frostfscontract "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/frostfs"
nmClient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/netmap"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event/rolemanagement"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger/test"
"github.com/nspcc-dev/neo-go/pkg/core/native/noderoles"
@ -37,6 +38,7 @@ func TestHandleAlphabetSyncEvent(t *testing.T) {
alphabetKeys: testKeys.mainnetKeys,
}
f := &testFrostFSClient{}
nm := &testNetmapClient{}
proc, err := New(
&Params{
@ -48,6 +50,7 @@ func TestHandleAlphabetSyncEvent(t *testing.T) {
MorphClient: m,
MainnetClient: mn,
FrostFSClient: f,
NetmapClient: nm,
},
)
@ -70,6 +73,10 @@ func TestHandleAlphabetSyncEvent(t *testing.T) {
},
}, v.votes, "invalid vote calls")
var irUpdateExp []nmClient.UpdateIRPrm
require.EqualValues(t, irUpdateExp, nm.updates, "invalid IR updates")
var expAlphabetUpdate client.UpdateAlphabetListPrm
expAlphabetUpdate.SetHash(ev.txHash)
expAlphabetUpdate.SetList(testKeys.newInnerRingExp)
@ -112,6 +119,7 @@ func TestHandleAlphabetDesignateEvent(t *testing.T) {
alphabetKeys: testKeys.mainnetKeys,
}
f := &testFrostFSClient{}
nm := &testNetmapClient{}
proc, err := New(
&Params{
@ -123,6 +131,7 @@ func TestHandleAlphabetDesignateEvent(t *testing.T) {
MorphClient: m,
MainnetClient: mn,
FrostFSClient: f,
NetmapClient: nm,
},
)
@ -146,6 +155,9 @@ func TestHandleAlphabetDesignateEvent(t *testing.T) {
},
}, v.votes, "invalid vote calls")
var irUpdatesExp []nmClient.UpdateIRPrm
require.EqualValues(t, irUpdatesExp, nm.updates, "invalid IR updates")
var alpabetUpdExp client.UpdateAlphabetListPrm
alpabetUpdExp.SetList(testKeys.newInnerRingExp)
alpabetUpdExp.SetHash(ev.TxHash)
@ -281,3 +293,12 @@ func (c *testFrostFSClient) AlphabetUpdate(p frostfscontract.AlphabetUpdatePrm)
c.updates = append(c.updates, p)
return nil
}
type testNetmapClient struct {
updates []nmClient.UpdateIRPrm
}
func (c *testNetmapClient) UpdateInnerRing(p nmClient.UpdateIRPrm) error {
c.updates = append(c.updates, p)
return nil
}

View file

@ -79,6 +79,7 @@ type (
metrics metrics.Register
pool *ants.Pool
frostfsClient FrostFSClient
netmapClient NetmapClient
alphabetState AlphabetState
epochState EpochState
@ -104,6 +105,7 @@ type (
MorphClient MorphClient
MainnetClient MainnetClient
FrostFSClient FrostFSClient
NetmapClient NetmapClient
}
)
@ -144,6 +146,7 @@ func New(p *Params) (*Processor, error) {
metrics: metricsRegister,
pool: pool,
frostfsClient: p.FrostFSClient,
netmapClient: p.NetmapClient,
alphabetState: p.AlphabetState,
epochState: p.EpochState,
voter: p.Voter,

View file

@ -60,7 +60,7 @@ func (c *cleanupTable) update(snapshot netmap.NetMap, now uint64) {
}
access.binNodeInfo = binNodeInfo
access.maintenance = nmNodes[i].Status().IsMaintenance()
access.maintenance = nmNodes[i].IsMaintenance()
newMap[keyString] = access
}

View file

@ -127,7 +127,7 @@ func TestCleanupTable(t *testing.T) {
t.Run("skip maintenance nodes", func(t *testing.T) {
cnt := 0
infos[1].SetStatus(netmap.Maintenance)
infos[1].SetMaintenance()
key := netmap.StringifyPublicKey(infos[1])
c.update(networkMap, 5)

View file

@ -146,7 +146,7 @@ func TestAddPeer(t *testing.T) {
require.Nil(t, nc.notaryInvokes, "invalid notary invokes")
node.SetStatus(netmap.Online)
node.SetOnline()
ev = netmapEvent.AddPeer{
NodeBytes: node.Marshal(),
Request: &payload.P2PNotaryRequest{

View file

@ -56,11 +56,11 @@ func (x *NetMapCandidateValidator) SetNetworkSettings(netSettings NetworkSetting
//
// See also netmap.NodeInfo.IsOnline/SetOnline and other similar methods.
func (x *NetMapCandidateValidator) VerifyAndUpdate(node *netmap.NodeInfo) error {
if node.Status().IsOnline() {
if node.IsOnline() {
return nil
}
if node.Status().IsMaintenance() {
if node.IsMaintenance() {
return x.netSettings.MaintenanceModeAllowed()
}

View file

@ -41,22 +41,22 @@ func TestValidator_VerifyAndUpdate(t *testing.T) {
},
{
name: "ONLINE",
preparer: func(ni *netmap.NodeInfo) { ni.SetStatus(netmap.Online) },
preparer: (*netmap.NodeInfo).SetOnline,
valid: true,
},
{
name: "OFFLINE",
preparer: func(ni *netmap.NodeInfo) { ni.SetStatus(netmap.Offline) },
preparer: (*netmap.NodeInfo).SetOffline,
valid: false,
},
{
name: "MAINTENANCE/allowed",
preparer: func(ni *netmap.NodeInfo) { ni.SetStatus(netmap.Maintenance) },
preparer: (*netmap.NodeInfo).SetMaintenance,
valid: true,
},
{
name: "MAINTENANCE/disallowed",
preparer: func(ni *netmap.NodeInfo) { ni.SetStatus(netmap.Maintenance) },
preparer: (*netmap.NodeInfo).SetMaintenance,
valid: false,
validatorPreparer: func(v *state.NetMapCandidateValidator) {
var s testNetworkSettings

View file

@ -62,7 +62,7 @@ func (np *Processor) processAddPeer(ev netmapEvent.AddPeer) bool {
// But there is no guarantee that code will be executed in the same order.
// That is why we need to perform `addPeerIR` only in case when node is online,
// because in scope of this method, contract set state `ONLINE` for the node.
if updated && nodeInfo.Status().IsOnline() {
if updated && nodeInfo.IsOnline() {
np.log.Info(logs.NetmapApprovingNetworkMapCandidate,
zap.String("key", keyString))

View file

@ -8,7 +8,6 @@ import (
type DeletePrm struct {
Address oid.Address
StorageID []byte
Size uint64
}
// DeleteRes groups the resulting values of Delete operation.

View file

@ -1,21 +1,22 @@
package fstree
import (
"sync"
"math"
"sync/atomic"
)
// FileCounter used to count files in FSTree. The implementation must be thread-safe.
type FileCounter interface {
Set(count, size uint64)
Inc(size uint64)
Dec(size uint64)
Set(v uint64)
Inc()
Dec()
}
type noopCounter struct{}
func (c *noopCounter) Set(uint64, uint64) {}
func (c *noopCounter) Inc(uint64) {}
func (c *noopCounter) Dec(uint64) {}
func (c *noopCounter) Set(uint64) {}
func (c *noopCounter) Inc() {}
func (c *noopCounter) Dec() {}
func counterEnabled(c FileCounter) bool {
_, noop := c.(*noopCounter)
@ -23,50 +24,14 @@ func counterEnabled(c FileCounter) bool {
}
type SimpleCounter struct {
mtx sync.RWMutex
count uint64
size uint64
v atomic.Uint64
}
func NewSimpleCounter() *SimpleCounter {
return &SimpleCounter{}
}
func (c *SimpleCounter) Set(count, size uint64) {
c.mtx.Lock()
defer c.mtx.Unlock()
c.count = count
c.size = size
}
func (c *SimpleCounter) Inc(size uint64) {
c.mtx.Lock()
defer c.mtx.Unlock()
c.count++
c.size += size
}
func (c *SimpleCounter) Dec(size uint64) {
c.mtx.Lock()
defer c.mtx.Unlock()
if c.count > 0 {
c.count--
} else {
panic("fstree.SimpleCounter: invalid count")
}
if c.size >= size {
c.size -= size
} else {
panic("fstree.SimpleCounter: invalid size")
}
}
func (c *SimpleCounter) CountSize() (uint64, uint64) {
c.mtx.RLock()
defer c.mtx.RUnlock()
return c.count, c.size
}
func (c *SimpleCounter) Set(v uint64) { c.v.Store(v) }
func (c *SimpleCounter) Inc() { c.v.Add(1) }
func (c *SimpleCounter) Dec() { c.v.Add(math.MaxUint64) }
func (c *SimpleCounter) Value() uint64 { return c.v.Load() }

View file

@ -222,81 +222,6 @@ func (t *FSTree) iterate(ctx context.Context, depth uint64, curPath []string, pr
return nil
}
type ObjectInfo struct {
Address oid.Address
DataSize uint64
}
type IterateInfoHandler func(ObjectInfo) error
func (t *FSTree) IterateInfo(ctx context.Context, handler IterateInfoHandler) error {
var (
err error
startedAt = time.Now()
)
defer func() {
t.metrics.IterateInfo(time.Since(startedAt), err == nil)
}()
_, span := tracing.StartSpanFromContext(ctx, "FSTree.IterateInfo")
defer span.End()
return t.iterateInfo(ctx, 0, []string{t.RootPath}, handler)
}
func (t *FSTree) iterateInfo(ctx context.Context, depth uint64, curPath []string, handler IterateInfoHandler) error {
curName := strings.Join(curPath[1:], "")
dirPath := filepath.Join(curPath...)
entries, err := os.ReadDir(dirPath)
if err != nil {
return fmt.Errorf("read fstree dir '%s': %w", dirPath, err)
}
isLast := depth >= t.Depth
l := len(curPath)
curPath = append(curPath, "")
for i := range entries {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
curPath[l] = entries[i].Name()
if !isLast && entries[i].IsDir() {
err := t.iterateInfo(ctx, depth+1, curPath, handler)
if err != nil {
return err
}
}
if depth != t.Depth {
continue
}
addr, err := addressFromString(curName + entries[i].Name())
if err != nil {
continue
}
info, err := entries[i].Info()
if err != nil {
if os.IsNotExist(err) {
continue
}
return err
}
err = handler(ObjectInfo{
Address: addr,
DataSize: uint64(info.Size()),
})
if err != nil {
return err
}
}
return nil
}
func (t *FSTree) treePath(addr oid.Address) string {
sAddr := stringifyAddress(addr)
@ -338,7 +263,7 @@ func (t *FSTree) Delete(ctx context.Context, prm common.DeletePrm) (common.Delet
}
p := t.treePath(prm.Address)
err = t.writer.removeFile(p, prm.Size)
err = t.writer.removeFile(p)
return common.DeleteRes{}, err
}
@ -510,38 +435,32 @@ func (t *FSTree) initFileCounter() error {
return nil
}
count, size, err := t.countFiles()
counter, err := t.countFiles()
if err != nil {
return err
}
t.fileCounter.Set(count, size)
t.fileCounter.Set(counter)
return nil
}
func (t *FSTree) countFiles() (uint64, uint64, error) {
var count, size uint64
func (t *FSTree) countFiles() (uint64, error) {
var counter uint64
// it is simpler to just consider every file
// that is not directory as an object
err := filepath.WalkDir(t.RootPath,
func(_ string, d fs.DirEntry, _ error) error {
if d.IsDir() {
return nil
if !d.IsDir() {
counter++
}
count++
info, err := d.Info()
if err != nil {
return err
}
size += uint64(info.Size())
return nil
},
)
if err != nil {
return 0, 0, fmt.Errorf("could not walk through %s directory: %w", t.RootPath, err)
return 0, fmt.Errorf("could not walk through %s directory: %w", t.RootPath, err)
}
return count, size, nil
return counter, nil
}
func (t *FSTree) ObjectsCount(ctx context.Context) (uint64, error) {

View file

@ -47,9 +47,8 @@ func TestObjectCounter(t *testing.T) {
require.NoError(t, fst.Open(mode.ComponentReadWrite))
require.NoError(t, fst.Init())
count, size := counter.CountSize()
require.Equal(t, uint64(0), count)
require.Equal(t, uint64(0), size)
counterValue := counter.Value()
require.Equal(t, uint64(0), counterValue)
defer func() {
require.NoError(t, fst.Close())
@ -65,73 +64,39 @@ func TestObjectCounter(t *testing.T) {
putPrm.Address = addr
putPrm.RawData, _ = obj.Marshal()
var getPrm common.GetPrm
getPrm.Address = putPrm.Address
var delPrm common.DeletePrm
delPrm.Address = addr
t.Run("without size hint", func(t *testing.T) {
eg, egCtx := errgroup.WithContext(context.Background())
eg, egCtx := errgroup.WithContext(context.Background())
eg.Go(func() error {
for range 1_000 {
_, err := fst.Put(egCtx, putPrm)
if err != nil {
return err
}
eg.Go(func() error {
for range 1_000 {
_, err := fst.Put(egCtx, putPrm)
if err != nil {
return err
}
return nil
})
eg.Go(func() error {
var le logicerr.Logical
for range 1_000 {
_, err := fst.Delete(egCtx, delPrm)
if err != nil && !errors.As(err, &le) {
return err
}
}
return nil
})
require.NoError(t, eg.Wait())
count, size = counter.CountSize()
realCount, realSize, err := fst.countFiles()
require.NoError(t, err)
require.Equal(t, realCount, count, "real %d, actual %d", realCount, count)
require.Equal(t, realSize, size, "real %d, actual %d", realSize, size)
}
return nil
})
t.Run("with size hint", func(t *testing.T) {
delPrm.Size = uint64(len(putPrm.RawData))
eg, egCtx := errgroup.WithContext(context.Background())
eg.Go(func() error {
for range 1_000 {
_, err := fst.Put(egCtx, putPrm)
if err != nil {
return err
}
eg.Go(func() error {
var le logicerr.Logical
for range 1_000 {
_, err := fst.Delete(egCtx, delPrm)
if err != nil && !errors.As(err, &le) {
return err
}
return nil
})
eg.Go(func() error {
var le logicerr.Logical
for range 1_000 {
_, err := fst.Delete(egCtx, delPrm)
if err != nil && !errors.As(err, &le) {
return err
}
}
return nil
})
require.NoError(t, eg.Wait())
count, size = counter.CountSize()
realCount, realSize, err := fst.countFiles()
require.NoError(t, err)
require.Equal(t, realCount, count, "real %d, actual %d", realCount, count)
require.Equal(t, realSize, size, "real %d, actual %d", realSize, size)
}
return nil
})
require.NoError(t, eg.Wait())
counterValue = counter.Value()
realCount, err := fst.countFiles()
require.NoError(t, err)
require.Equal(t, realCount, counterValue)
}

View file

@ -16,7 +16,7 @@ import (
type writer interface {
writeData(string, []byte) error
removeFile(string, uint64) error
removeFile(string) error
}
type genericWriter struct {
@ -78,14 +78,14 @@ func (w *genericWriter) writeAndRename(tmpPath, p string, data []byte) error {
}
if w.fileCounterEnabled {
w.fileCounter.Inc(uint64(len(data)))
w.fileCounter.Inc()
var targetFileExists bool
if _, e := os.Stat(p); e == nil {
targetFileExists = true
}
err = os.Rename(tmpPath, p)
if err == nil && targetFileExists {
w.fileCounter.Dec(uint64(len(data)))
w.fileCounter.Dec()
}
} else {
err = os.Rename(tmpPath, p)
@ -107,10 +107,15 @@ func (w *genericWriter) writeFile(p string, data []byte) error {
return err
}
func (w *genericWriter) removeFile(p string, size uint64) error {
func (w *genericWriter) removeFile(p string) error {
var err error
if w.fileCounterEnabled {
err = w.removeWithCounter(p, size)
w.fileGuard.Lock(p)
err = os.Remove(p)
w.fileGuard.Unlock(p)
if err == nil {
w.fileCounter.Dec()
}
} else {
err = os.Remove(p)
}
@ -120,22 +125,3 @@ func (w *genericWriter) removeFile(p string, size uint64) error {
}
return err
}
func (w *genericWriter) removeWithCounter(p string, size uint64) error {
w.fileGuard.Lock(p)
defer w.fileGuard.Unlock(p)
if size == 0 {
stat, err := os.Stat(p)
if err != nil {
return err
}
size = uint64(stat.Size())
}
if err := os.Remove(p); err != nil {
return err
}
w.fileCounter.Dec(uint64(size))
return nil
}

View file

@ -9,7 +9,6 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
utilSync "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/sync"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
"golang.org/x/sys/unix"
)
@ -19,9 +18,7 @@ type linuxWriter struct {
perm uint32
flags int
fileGuard keyLock
fileCounter FileCounter
fileCounterEnabled bool
counter FileCounter
}
func newSpecificWriteData(c FileCounter, root string, perm fs.FileMode, noSync bool) writer {
@ -36,18 +33,11 @@ func newSpecificWriteData(c FileCounter, root string, perm fs.FileMode, noSync b
return nil
}
_ = unix.Close(fd) // Don't care about error.
var fileGuard keyLock = &noopKeyLock{}
fileCounterEnabled := counterEnabled(c)
if fileCounterEnabled {
fileGuard = utilSync.NewKeyLocker[string]()
}
w := &linuxWriter{
root: root,
perm: uint32(perm),
flags: flags,
fileGuard: fileGuard,
fileCounter: c,
fileCounterEnabled: fileCounterEnabled,
root: root,
perm: uint32(perm),
flags: flags,
counter: c,
}
return w
}
@ -61,10 +51,6 @@ func (w *linuxWriter) writeData(p string, data []byte) error {
}
func (w *linuxWriter) writeFile(p string, data []byte) error {
if w.fileCounterEnabled {
w.fileGuard.Lock(p)
defer w.fileGuard.Unlock(p)
}
fd, err := unix.Open(w.root, w.flags, w.perm)
if err != nil {
return err
@ -75,7 +61,7 @@ func (w *linuxWriter) writeFile(p string, data []byte) error {
if n == len(data) {
err = unix.Linkat(unix.AT_FDCWD, tmpPath, unix.AT_FDCWD, p, unix.AT_SYMLINK_FOLLOW)
if err == nil {
w.fileCounter.Inc(uint64(len(data)))
w.counter.Inc()
}
if errors.Is(err, unix.EEXIST) {
err = nil
@ -91,30 +77,13 @@ func (w *linuxWriter) writeFile(p string, data []byte) error {
return errClose
}
func (w *linuxWriter) removeFile(p string, size uint64) error {
if w.fileCounterEnabled {
w.fileGuard.Lock(p)
defer w.fileGuard.Unlock(p)
if size == 0 {
var stat unix.Stat_t
err := unix.Stat(p, &stat)
if err != nil {
if err == unix.ENOENT {
return logicerr.Wrap(new(apistatus.ObjectNotFound))
}
return err
}
size = uint64(stat.Size)
}
}
func (w *linuxWriter) removeFile(p string) error {
err := unix.Unlink(p)
if err != nil && err == unix.ENOENT {
return logicerr.Wrap(new(apistatus.ObjectNotFound))
}
if err == nil {
w.fileCounter.Dec(uint64(size))
w.counter.Dec()
}
return err
}

View file

@ -13,7 +13,6 @@ type Metrics interface {
Close()
Iterate(d time.Duration, success bool)
IterateInfo(d time.Duration, success bool)
Delete(d time.Duration, success bool)
Exists(d time.Duration, success bool)
Put(d time.Duration, size int, success bool)
@ -28,7 +27,6 @@ func (m *noopMetrics) SetParentID(string) {}
func (m *noopMetrics) SetMode(mode.ComponentMode) {}
func (m *noopMetrics) Close() {}
func (m *noopMetrics) Iterate(time.Duration, bool) {}
func (m *noopMetrics) IterateInfo(time.Duration, bool) {}
func (m *noopMetrics) Delete(time.Duration, bool) {}
func (m *noopMetrics) Exists(time.Duration, bool) {}
func (m *noopMetrics) Put(time.Duration, int, bool) {}

View file

@ -10,11 +10,11 @@ func (s *memstoreImpl) Open(mod mode.ComponentMode) error {
return nil
}
func (s *memstoreImpl) Init() error { return nil }
func (s *memstoreImpl) Close() error { return nil }
func (s *memstoreImpl) Type() string { return Type }
func (s *memstoreImpl) Path() string { return s.rootPath }
func (s *memstoreImpl) SetCompressor(cc *compression.Config) { s.compression = cc }
func (s *memstoreImpl) Compressor() *compression.Config { return s.compression }
func (s *memstoreImpl) SetReportErrorFunc(func(string, error)) {}
func (s *memstoreImpl) SetParentID(string) {}
func (s *memstoreImpl) Init() error { return nil }
func (s *memstoreImpl) Close() error { return nil }
func (s *memstoreImpl) Type() string { return Type }
func (s *memstoreImpl) Path() string { return s.rootPath }
func (s *memstoreImpl) SetCompressor(cc *compression.Config) { s.compression = cc }
func (s *memstoreImpl) Compressor() *compression.Config { return s.compression }
func (s *memstoreImpl) SetReportErrorFunc(f func(string, error)) { s.reportError = f }
func (s *memstoreImpl) SetParentID(string) {}

View file

@ -8,6 +8,7 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/internal/blobstortest"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger/test"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
"github.com/stretchr/testify/require"
)
@ -15,6 +16,7 @@ import (
func TestSimpleLifecycle(t *testing.T) {
s := New(
WithRootPath("memstore"),
WithLogger(test.NewLogger(t)),
)
defer func() { require.NoError(t, s.Close()) }()
require.NoError(t, s.Open(mode.ComponentReadWrite))

View file

@ -2,20 +2,33 @@ package memstore
import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/compression"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
"go.uber.org/zap"
)
type cfg struct {
log *logger.Logger
rootPath string
readOnly bool
compression *compression.Config
reportError func(string, error)
}
func defaultConfig() *cfg {
return &cfg{}
return &cfg{
log: &logger.Logger{Logger: zap.L()},
reportError: func(string, error) {},
}
}
type Option func(*cfg)
func WithLogger(l *logger.Logger) Option {
return func(c *cfg) {
c.log = l
}
}
func WithRootPath(p string) Option {
return func(c *cfg) {
c.rootPath = p

View file

@ -249,9 +249,23 @@ func (e *StorageEngine) ResumeExecution() error {
}
type ReConfiguration struct {
errorsThreshold uint32
shardPoolSize uint32
shards map[string][]shard.Option // meta path -> shard opts
}
// SetErrorsThreshold sets a size amount of errors after which
// shard is moved to read-only mode.
func (rCfg *ReConfiguration) SetErrorsThreshold(errorsThreshold uint32) {
rCfg.errorsThreshold = errorsThreshold
}
// SetShardPoolSize sets a size of worker pool for each shard.
func (rCfg *ReConfiguration) SetShardPoolSize(shardPoolSize uint32) {
rCfg.shardPoolSize = shardPoolSize
}
// AddShard adds a shard for the reconfiguration.
// Shard identifier is calculated from paths used in blobstor.
func (rCfg *ReConfiguration) AddShard(id string, opts []shard.Option) {

View file

@ -9,15 +9,14 @@ import (
"sync/atomic"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/pilorama"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
tracingPkg "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/tracing"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util"
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
@ -25,16 +24,6 @@ import (
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
)
const (
// containerWorkerCountDefault is a default value of the count of
// concurrent container evacuation workers.
containerWorkerCountDefault = 10
// objectWorkerCountDefault is a default value of the count of
// concurrent object evacuation workers.
objectWorkerCountDefault = 10
)
var (
@ -90,10 +79,6 @@ type EvacuateShardPrm struct {
IgnoreErrors bool
Async bool
Scope EvacuateScope
RepOneOnly bool
ContainerWorkerCount uint32
ObjectWorkerCount uint32
}
// EvacuateShardRes represents result of the EvacuateShard operation.
@ -204,6 +189,8 @@ func (p *EvacuateShardRes) DeepCopy() *EvacuateShardRes {
return res
}
const defaultEvacuateBatchSize = 100
type pooledShard struct {
hashedShard
pool util.WorkerPool
@ -255,16 +242,8 @@ func (e *StorageEngine) Evacuate(ctx context.Context, prm EvacuateShardPrm) (*Ev
return nil, err
}
var mtx sync.RWMutex
copyShards := func() []pooledShard {
mtx.RLock()
defer mtx.RUnlock()
t := make([]pooledShard, len(shards))
copy(t, shards)
return t
}
eg.Go(func() error {
return e.evacuateShards(egCtx, shardIDs, prm, res, copyShards, shardsToEvacuate)
return e.evacuateShards(egCtx, shardIDs, prm, res, shards, shardsToEvacuate)
})
if prm.Async {
@ -282,7 +261,7 @@ func ctxOrBackground(ctx context.Context, background bool) context.Context {
}
func (e *StorageEngine) evacuateShards(ctx context.Context, shardIDs []string, prm EvacuateShardPrm, res *EvacuateShardRes,
shards func() []pooledShard, shardsToEvacuate map[string]*shard.Shard,
shards []pooledShard, shardsToEvacuate map[string]*shard.Shard,
) error {
var err error
ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.evacuateShards",
@ -291,7 +270,6 @@ func (e *StorageEngine) evacuateShards(ctx context.Context, shardIDs []string, p
attribute.Bool("async", prm.Async),
attribute.Bool("ignoreErrors", prm.IgnoreErrors),
attribute.Stringer("scope", prm.Scope),
attribute.Bool("repOneOnly", prm.RepOneOnly),
))
defer func() {
@ -309,39 +287,13 @@ func (e *StorageEngine) evacuateShards(ctx context.Context, shardIDs []string, p
return err
}
ctx, cancel, egShard, egContainer, egObject := e.createErrorGroupsForEvacuation(ctx, prm)
continueLoop := true
for i := 0; continueLoop && i < len(shardIDs); i++ {
select {
case <-ctx.Done():
continueLoop = false
default:
egShard.Go(func() error {
err := e.evacuateShard(ctx, cancel, shardIDs[i], prm, res, shards, shardsToEvacuate, egContainer, egObject)
if err != nil {
cancel(err)
}
return err
})
for _, shardID := range shardIDs {
if err = e.evacuateShard(ctx, shardID, prm, res, shards, shardsToEvacuate); err != nil {
e.log.Error(logs.EngineFinishedWithErrorShardsEvacuation, zap.Error(err), zap.Strings("shard_ids", shardIDs), evacuationOperationLogField,
zap.String("trace_id", tracingPkg.GetTraceID(ctx)), zap.Stringer("scope", prm.Scope))
return err
}
}
err = egShard.Wait()
if err != nil {
err = fmt.Errorf("shard error: %w", err)
}
errContainer := egContainer.Wait()
errObject := egObject.Wait()
if errContainer != nil {
err = errors.Join(err, fmt.Errorf("container error: %w", errContainer))
}
if errObject != nil {
err = errors.Join(err, fmt.Errorf("object error: %w", errObject))
}
if err != nil {
e.log.Error(logs.EngineFinishedWithErrorShardsEvacuation, zap.Error(err), zap.Strings("shard_ids", shardIDs), evacuationOperationLogField,
zap.String("trace_id", tracingPkg.GetTraceID(ctx)), zap.Stringer("scope", prm.Scope))
return err
}
e.log.Info(logs.EngineFinishedSuccessfullyShardsEvacuation,
zap.Strings("shard_ids", shardIDs),
@ -357,27 +309,6 @@ func (e *StorageEngine) evacuateShards(ctx context.Context, shardIDs []string, p
return nil
}
func (e *StorageEngine) createErrorGroupsForEvacuation(ctx context.Context, prm EvacuateShardPrm) (
context.Context, context.CancelCauseFunc, *errgroup.Group, *errgroup.Group, *errgroup.Group,
) {
operationCtx, cancel := context.WithCancelCause(ctx)
egObject, _ := errgroup.WithContext(operationCtx)
objectWorkerCount := prm.ObjectWorkerCount
if objectWorkerCount == 0 {
objectWorkerCount = objectWorkerCountDefault
}
egObject.SetLimit(int(objectWorkerCount))
egContainer, _ := errgroup.WithContext(operationCtx)
containerWorkerCount := prm.ContainerWorkerCount
if containerWorkerCount == 0 {
containerWorkerCount = containerWorkerCountDefault
}
egContainer.SetLimit(int(containerWorkerCount))
egShard, _ := errgroup.WithContext(operationCtx)
return operationCtx, cancel, egShard, egContainer, egObject
}
func (e *StorageEngine) getTotals(ctx context.Context, prm EvacuateShardPrm, shardsToEvacuate map[string]*shard.Shard, res *EvacuateShardRes) error {
ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.getTotals")
defer span.End()
@ -404,9 +335,8 @@ func (e *StorageEngine) getTotals(ctx context.Context, prm EvacuateShardPrm, sha
return nil
}
func (e *StorageEngine) evacuateShard(ctx context.Context, cancel context.CancelCauseFunc, shardID string, prm EvacuateShardPrm, res *EvacuateShardRes,
shards func() []pooledShard, shardsToEvacuate map[string]*shard.Shard,
egContainer *errgroup.Group, egObject *errgroup.Group,
func (e *StorageEngine) evacuateShard(ctx context.Context, shardID string, prm EvacuateShardPrm, res *EvacuateShardRes,
shards []pooledShard, shardsToEvacuate map[string]*shard.Shard,
) error {
ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.evacuateShard",
trace.WithAttributes(
@ -415,10 +345,11 @@ func (e *StorageEngine) evacuateShard(ctx context.Context, cancel context.Cancel
defer span.End()
if prm.Scope.WithObjects() {
if err := e.evacuateShardObjects(ctx, cancel, shardID, prm, res, shards, shardsToEvacuate, egContainer, egObject); err != nil {
if err := e.evacuateShardObjects(ctx, shardID, prm, res, shards, shardsToEvacuate); err != nil {
return err
}
}
if prm.Scope.WithTrees() && shardsToEvacuate[shardID].PiloramaEnabled() {
if err := e.evacuateShardTrees(ctx, shardID, prm, res, shards, shardsToEvacuate); err != nil {
return err
@ -428,81 +359,44 @@ func (e *StorageEngine) evacuateShard(ctx context.Context, cancel context.Cancel
return nil
}
func (e *StorageEngine) evacuateShardObjects(ctx context.Context, cancel context.CancelCauseFunc, shardID string, prm EvacuateShardPrm, res *EvacuateShardRes,
shards func() []pooledShard, shardsToEvacuate map[string]*shard.Shard,
egContainer *errgroup.Group, egObject *errgroup.Group,
func (e *StorageEngine) evacuateShardObjects(ctx context.Context, shardID string, prm EvacuateShardPrm, res *EvacuateShardRes,
shards []pooledShard, shardsToEvacuate map[string]*shard.Shard,
) error {
sh := shardsToEvacuate[shardID]
var cntPrm shard.IterateOverContainersPrm
cntPrm.Handler = func(ctx context.Context, name []byte, cnt cid.ID) error {
select {
case <-ctx.Done():
return context.Cause(ctx)
default:
}
egContainer.Go(func() error {
var skip bool
c, err := e.containerSource.Load().cs.Get(cnt)
if err != nil {
if client.IsErrContainerNotFound(err) {
skip = true
} else {
return err
}
}
if !skip && prm.RepOneOnly {
skip = e.isNotRepOne(c)
}
if skip {
countPrm := shard.CountAliveObjectsInBucketPrm{BucketName: name}
count, err := sh.CountAliveObjectsInBucket(ctx, countPrm)
if err != nil {
return err
}
res.objSkipped.Add(count)
return nil
}
var objPrm shard.IterateOverObjectsInContainerPrm
objPrm.BucketName = name
objPrm.Handler = func(ctx context.Context, objInfo *object.Info) error {
select {
case <-ctx.Done():
return context.Cause(ctx)
default:
}
egObject.Go(func() error {
err := e.evacuateObject(ctx, shardID, objInfo, prm, res, shards, shardsToEvacuate)
if err != nil {
cancel(err)
}
return err
})
return nil
}
err = sh.IterateOverObjectsInContainer(ctx, objPrm)
if err != nil {
cancel(err)
}
return err
})
return nil
}
var listPrm shard.ListWithCursorPrm
listPrm.WithCount(defaultEvacuateBatchSize)
sh := shardsToEvacuate[shardID]
sh.SetEvacuationInProgress(true)
err := sh.IterateOverContainers(ctx, cntPrm)
if err != nil {
cancel(err)
e.log.Error(logs.EngineShardsEvacuationFailedToListObjects, zap.String("shard_id", shardID), zap.Error(err), evacuationOperationLogField,
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
var c *meta.Cursor
for {
listPrm.WithCursor(c)
// TODO (@fyrchik): #1731 this approach doesn't work in degraded modes
// because ListWithCursor works only with the metabase.
listRes, err := sh.ListWithCursor(ctx, listPrm)
if err != nil {
if errors.Is(err, meta.ErrEndOfListing) || errors.Is(err, shard.ErrDegradedMode) {
break
}
e.log.Error(logs.EngineShardsEvacuationFailedToListObjects, zap.String("shard_id", shardID), zap.Error(err), evacuationOperationLogField,
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
return err
}
if err = e.evacuateObjects(ctx, sh, listRes.AddressList(), prm, res, shards, shardsToEvacuate); err != nil {
return err
}
c = listRes.Cursor()
}
return err
return nil
}
func (e *StorageEngine) evacuateShardTrees(ctx context.Context, shardID string, prm EvacuateShardPrm, res *EvacuateShardRes,
getShards func() []pooledShard, shardsToEvacuate map[string]*shard.Shard,
shards []pooledShard, shardsToEvacuate map[string]*shard.Shard,
) error {
sh := shardsToEvacuate[shardID]
shards := getShards()
var listPrm pilorama.TreeListTreesPrm
first := true
@ -743,79 +637,72 @@ func (e *StorageEngine) getActualShards(shardIDs []string, prm EvacuateShardPrm)
return shards, nil
}
func (e *StorageEngine) evacuateObject(ctx context.Context, shardID string, objInfo *object.Info, prm EvacuateShardPrm, res *EvacuateShardRes,
getShards func() []pooledShard, shardsToEvacuate map[string]*shard.Shard,
func (e *StorageEngine) evacuateObjects(ctx context.Context, sh *shard.Shard, toEvacuate []object.Info, prm EvacuateShardPrm, res *EvacuateShardRes,
shards []pooledShard, shardsToEvacuate map[string]*shard.Shard,
) error {
ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.evacuateObjects")
ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.evacuateObjects",
trace.WithAttributes(
attribute.Int("objects_count", len(toEvacuate)),
))
defer span.End()
select {
case <-ctx.Done():
return context.Cause(ctx)
default:
}
shards := getShards()
addr := objInfo.Address
var getPrm shard.GetPrm
getPrm.SetAddress(addr)
getPrm.SkipEvacCheck(true)
getRes, err := shardsToEvacuate[shardID].Get(ctx, getPrm)
if err != nil {
if prm.IgnoreErrors {
res.objFailed.Add(1)
return nil
for i := range toEvacuate {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
e.log.Error(logs.EngineShardsEvacuationFailedToReadObject, zap.String("address", addr.EncodeToString()), zap.Error(err), evacuationOperationLogField,
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
return err
}
addr := toEvacuate[i].Address
evacuatedLocal, err := e.tryEvacuateObjectLocal(ctx, addr, getRes.Object(), shardsToEvacuate[shardID], shards, shardsToEvacuate, res)
if err != nil {
return err
}
var getPrm shard.GetPrm
getPrm.SetAddress(addr)
getPrm.SkipEvacCheck(true)
if evacuatedLocal {
return nil
}
getRes, err := sh.Get(ctx, getPrm)
if err != nil {
if prm.IgnoreErrors {
res.objFailed.Add(1)
continue
}
e.log.Error(logs.EngineShardsEvacuationFailedToReadObject, zap.String("address", addr.EncodeToString()), zap.Error(err), evacuationOperationLogField,
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
return err
}
if prm.ObjectsHandler == nil {
// Do not check ignoreErrors flag here because
// ignoring errors on put make this command kinda useless.
return fmt.Errorf("%w: %s", errPutShard, objInfo)
}
evacuatedLocal, err := e.tryEvacuateObjectLocal(ctx, addr, getRes.Object(), sh, shards, shardsToEvacuate, res)
if err != nil {
return err
}
moved, err := prm.ObjectsHandler(ctx, addr, getRes.Object())
if err != nil {
e.log.Error(logs.EngineShardsEvacuationFailedToMoveObject, zap.String("address", addr.EncodeToString()), zap.Error(err), evacuationOperationLogField,
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
return err
}
if moved {
res.objEvacuated.Add(1)
} else if prm.IgnoreErrors {
res.objFailed.Add(1)
e.log.Warn(logs.EngineShardsEvacuationFailedToMoveObject, zap.String("address", addr.EncodeToString()), zap.Error(err), evacuationOperationLogField,
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
} else {
return fmt.Errorf("object %s was not replicated", addr)
if evacuatedLocal {
continue
}
if prm.ObjectsHandler == nil {
// Do not check ignoreErrors flag here because
// ignoring errors on put make this command kinda useless.
return fmt.Errorf("%w: %s", errPutShard, toEvacuate[i])
}
moved, err := prm.ObjectsHandler(ctx, addr, getRes.Object())
if err != nil {
e.log.Error(logs.EngineShardsEvacuationFailedToMoveObject, zap.String("address", addr.EncodeToString()), zap.Error(err), evacuationOperationLogField,
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
return err
}
if moved {
res.objEvacuated.Add(1)
} else if prm.IgnoreErrors {
res.objFailed.Add(1)
e.log.Warn(logs.EngineShardsEvacuationFailedToMoveObject, zap.String("address", addr.EncodeToString()), zap.Error(err), evacuationOperationLogField,
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
} else {
return fmt.Errorf("object %s was not replicated", addr)
}
}
return nil
}
func (e *StorageEngine) isNotRepOne(c *container.Container) bool {
p := c.Value.PlacementPolicy()
for i := range p.NumberOfReplicas() {
if p.ReplicaDescriptor(i).NumberOfObjects() > 1 {
return true
}
}
return false
}
func (e *StorageEngine) tryEvacuateObjectLocal(ctx context.Context, addr oid.Address, object *objectSDK.Object, sh *shard.Shard,
shards []pooledShard, shardsToEvacuate map[string]*shard.Shard, res *EvacuateShardRes,
) (bool, error) {

View file

@ -6,12 +6,9 @@ import (
"fmt"
"path/filepath"
"strconv"
"sync"
"sync/atomic"
"testing"
"time"
coreContainer "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
objectCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/fstree"
@ -21,38 +18,14 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger/test"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"github.com/stretchr/testify/require"
"golang.org/x/sync/errgroup"
)
type containerStorage struct {
cntmap map[cid.ID]*container.Container
latency time.Duration
}
func (cs *containerStorage) Get(id cid.ID) (*coreContainer.Container, error) {
time.Sleep(cs.latency)
v, ok := cs.cntmap[id]
if !ok {
return nil, new(apistatus.ContainerNotFound)
}
coreCnt := coreContainer.Container{
Value: *v,
}
return &coreCnt, nil
}
func (cs *containerStorage) DeletionInfo(cid.ID) (*coreContainer.DelInfo, error) {
return nil, nil
}
func newEngineEvacuate(t *testing.T, shardNum int, objPerShard int) (*StorageEngine, []*shard.ID, []*objectSDK.Object) {
dir := t.TempDir()
@ -86,15 +59,10 @@ func newEngineEvacuate(t *testing.T, shardNum int, objPerShard int) (*StorageEng
{Key: pilorama.AttributeVersion, Value: []byte("XXX")},
{Key: pilorama.AttributeFilename, Value: []byte("file.txt")},
}
cnrMap := make(map[cid.ID]*container.Container)
for _, sh := range ids {
for i := range objPerShard {
// Create dummy container
cnr1 := container.Container{}
cnr1.SetAttribute("cnr", "cnr"+strconv.Itoa(i))
contID := cidtest.ID()
cnrMap[contID] = &cnr1
for _, sh := range ids {
for range objPerShard {
contID := cidtest.ID()
obj := testutil.GenerateObjectWithCID(contID)
objects = append(objects, obj)
@ -108,7 +76,6 @@ func newEngineEvacuate(t *testing.T, shardNum int, objPerShard int) (*StorageEng
require.NoError(t, err)
}
}
e.SetContainerSource(&containerStorage{cntmap: cnrMap})
return e, ids, objects
}
@ -207,16 +174,13 @@ func TestEvacuateObjectsNetwork(t *testing.T) {
errReplication := errors.New("handler error")
acceptOneOf := func(objects []*objectSDK.Object, max uint64) func(context.Context, oid.Address, *objectSDK.Object) (bool, error) {
var n atomic.Uint64
var mtx sync.Mutex
var n uint64
return func(_ context.Context, addr oid.Address, obj *objectSDK.Object) (bool, error) {
mtx.Lock()
defer mtx.Unlock()
if n.Load() == max {
if n == max {
return false, errReplication
}
n.Add(1)
n++
for i := range objects {
if addr == objectCore.AddressOf(objects[i]) {
require.Equal(t, objects[i], obj)
@ -350,36 +314,6 @@ func TestEvacuateCancellation(t *testing.T) {
require.Equal(t, uint64(0), res.ObjectsEvacuated())
}
func TestEvacuateCancellationByError(t *testing.T) {
t.Parallel()
e, ids, _ := newEngineEvacuate(t, 2, 10)
defer func() {
require.NoError(t, e.Close(context.Background()))
}()
require.NoError(t, e.shards[ids[0].String()].SetMode(mode.ReadOnly))
require.NoError(t, e.shards[ids[1].String()].SetMode(mode.ReadOnly))
var prm EvacuateShardPrm
prm.ShardID = ids[1:2]
var once atomic.Bool
prm.ObjectsHandler = func(ctx context.Context, a oid.Address, o *objectSDK.Object) (bool, error) {
var err error
flag := true
if once.CompareAndSwap(false, true) {
err = errors.New("test error")
flag = false
}
return flag, err
}
prm.Scope = EvacuateScopeObjects
prm.ObjectWorkerCount = 2
prm.ContainerWorkerCount = 2
_, err := e.Evacuate(context.Background(), prm)
require.ErrorContains(t, err, "test error")
}
func TestEvacuateSingleProcess(t *testing.T) {
e, ids, _ := newEngineEvacuate(t, 2, 3)
defer func() {
@ -597,7 +531,6 @@ func TestEvacuateTreesRemote(t *testing.T) {
require.NoError(t, e.shards[ids[0].String()].SetMode(mode.ReadOnly))
require.NoError(t, e.shards[ids[1].String()].SetMode(mode.ReadOnly))
mutex := sync.Mutex{}
evacuatedTreeOps := make(map[string][]*pilorama.Move)
var prm EvacuateShardPrm
prm.ShardID = ids
@ -612,9 +545,7 @@ func TestEvacuateTreesRemote(t *testing.T) {
if op.Time == 0 {
return true, "", nil
}
mutex.Lock()
evacuatedTreeOps[key] = append(evacuatedTreeOps[key], &op)
mutex.Unlock()
height = op.Time + 1
}
}
@ -674,146 +605,3 @@ func TestEvacuateTreesRemote(t *testing.T) {
require.Equal(t, expectedTreeOps, evacuatedTreeOps)
}
func TestEvacuateShardObjectsRepOneOnly(t *testing.T) {
e, ids, _ := newEngineEvacuate(t, 2, 0)
defer func() {
require.NoError(t, e.Close(context.Background()))
}()
// Create container with policy REP 2
cnr1 := container.Container{}
p1 := netmap.PlacementPolicy{}
p1.SetContainerBackupFactor(1)
x1 := netmap.ReplicaDescriptor{}
x1.SetNumberOfObjects(2)
p1.AddReplicas(x1)
x1 = netmap.ReplicaDescriptor{}
x1.SetNumberOfObjects(1)
p1.AddReplicas(x1)
cnr1.SetPlacementPolicy(p1)
cnr1.SetAttribute("cnr", "cnr1")
var idCnr1 cid.ID
container.CalculateID(&idCnr1, cnr1)
cnrmap := make(map[cid.ID]*container.Container)
var cids []cid.ID
cnrmap[idCnr1] = &cnr1
cids = append(cids, idCnr1)
// Create container with policy REP 1
cnr2 := container.Container{}
p2 := netmap.PlacementPolicy{}
p2.SetContainerBackupFactor(1)
x2 := netmap.ReplicaDescriptor{}
x2.SetNumberOfObjects(1)
p2.AddReplicas(x2)
x2 = netmap.ReplicaDescriptor{}
x2.SetNumberOfObjects(1)
p2.AddReplicas(x2)
cnr2.SetPlacementPolicy(p2)
cnr2.SetAttribute("cnr", "cnr2")
var idCnr2 cid.ID
container.CalculateID(&idCnr2, cnr2)
cnrmap[idCnr2] = &cnr2
cids = append(cids, idCnr2)
// Create container for simulate removing
cnr3 := container.Container{}
p3 := netmap.PlacementPolicy{}
p3.SetContainerBackupFactor(1)
x3 := netmap.ReplicaDescriptor{}
x3.SetNumberOfObjects(1)
p3.AddReplicas(x3)
cnr3.SetPlacementPolicy(p3)
cnr3.SetAttribute("cnr", "cnr3")
var idCnr3 cid.ID
container.CalculateID(&idCnr3, cnr3)
cids = append(cids, idCnr3)
e.SetContainerSource(&containerStorage{cntmap: cnrmap})
for _, sh := range ids {
for j := range 3 {
for range 4 {
obj := testutil.GenerateObjectWithCID(cids[j])
var putPrm shard.PutPrm
putPrm.SetObject(obj)
_, err := e.shards[sh.String()].Put(context.Background(), putPrm)
require.NoError(t, err)
}
}
}
var prm EvacuateShardPrm
prm.ShardID = ids[0:1]
prm.Scope = EvacuateScopeObjects
prm.RepOneOnly = true
require.NoError(t, e.shards[ids[0].String()].SetMode(mode.ReadOnly))
res, err := e.Evacuate(context.Background(), prm)
require.NoError(t, err)
require.Equal(t, uint64(4), res.ObjectsEvacuated())
require.Equal(t, uint64(8), res.ObjectsSkipped())
require.Equal(t, uint64(0), res.ObjectsFailed())
}
func TestEvacuateShardObjectsRepOneOnlyBench(t *testing.T) {
t.Skip()
e, ids, _ := newEngineEvacuate(t, 2, 0)
defer func() {
require.NoError(t, e.Close(context.Background()))
}()
cnrmap := make(map[cid.ID]*container.Container)
var cids []cid.ID
// Create containers with policy REP 1
for i := range 10_000 {
cnr1 := container.Container{}
p1 := netmap.PlacementPolicy{}
p1.SetContainerBackupFactor(1)
x1 := netmap.ReplicaDescriptor{}
x1.SetNumberOfObjects(2)
p1.AddReplicas(x1)
cnr1.SetPlacementPolicy(p1)
cnr1.SetAttribute("i", strconv.Itoa(i))
var idCnr1 cid.ID
container.CalculateID(&idCnr1, cnr1)
cnrmap[idCnr1] = &cnr1
cids = append(cids, idCnr1)
}
e.SetContainerSource(&containerStorage{
cntmap: cnrmap,
latency: time.Millisecond * 100,
})
for _, cnt := range cids {
for range 1 {
obj := testutil.GenerateObjectWithCID(cnt)
var putPrm shard.PutPrm
putPrm.SetObject(obj)
_, err := e.shards[ids[0].String()].Put(context.Background(), putPrm)
require.NoError(t, err)
}
}
var prm EvacuateShardPrm
prm.ShardID = ids[0:1]
prm.Scope = EvacuateScopeObjects
prm.RepOneOnly = true
prm.ContainerWorkerCount = 10
require.NoError(t, e.shards[ids[0].String()].SetMode(mode.ReadOnly))
start := time.Now()
_, err := e.Evacuate(context.Background(), prm)
t.Logf("evacuate took %v\n", time.Since(start))
require.NoError(t, err)
}

View file

@ -199,9 +199,7 @@ func TestLockExpiration(t *testing.T) {
require.NoError(t, err)
var inhumePrm InhumePrm
tombAddr := oidtest.Address()
tombAddr.SetContainer(cnr)
inhumePrm.WithTarget(tombAddr, objectcore.AddressOf(obj))
inhumePrm.WithTarget(oidtest.Address(), objectcore.AddressOf(obj))
var objLockedErr *apistatus.ObjectLocked
_, err = e.Inhume(context.Background(), inhumePrm)
@ -211,9 +209,7 @@ func TestLockExpiration(t *testing.T) {
e.HandleNewEpoch(context.Background(), lockerExpiresAfter+1)
// 4.
tombAddr = oidtest.Address()
tombAddr.SetContainer(cnr)
inhumePrm.WithTarget(tombAddr, objectcore.AddressOf(obj))
inhumePrm.WithTarget(oidtest.Address(), objectcore.AddressOf(obj))
require.Eventually(t, func() bool {
_, err = e.Inhume(context.Background(), inhumePrm)

View file

@ -169,16 +169,18 @@ func (m *writeCacheMetrics) Put(d time.Duration, success bool, st writecache.Sto
m.metrics.AddMethodDuration(m.shardID, m.path, st.String(), "Put", success, d)
}
func (m *writeCacheMetrics) SetEstimateSize(size uint64) {
m.metrics.SetEstimateSize(m.shardID, m.path, writecache.StorageTypeFSTree.String(), size)
func (m *writeCacheMetrics) SetEstimateSize(db, fstree uint64) {
m.metrics.SetEstimateSize(m.shardID, m.path, writecache.StorageTypeDB.String(), db)
m.metrics.SetEstimateSize(m.shardID, m.path, writecache.StorageTypeFSTree.String(), fstree)
}
func (m *writeCacheMetrics) SetMode(mod mode.ComponentMode) {
m.metrics.SetMode(m.shardID, mod.String())
}
func (m *writeCacheMetrics) SetActualCounters(count uint64) {
m.metrics.SetActualCount(m.shardID, m.path, writecache.StorageTypeFSTree.String(), count)
func (m *writeCacheMetrics) SetActualCounters(db, fstree uint64) {
m.metrics.SetActualCount(m.shardID, m.path, writecache.StorageTypeDB.String(), db)
m.metrics.SetActualCount(m.shardID, m.path, writecache.StorageTypeFSTree.String(), fstree)
}
func (m *writeCacheMetrics) Flush(success bool, st writecache.StorageType) {

View file

@ -67,7 +67,7 @@ func TestDB_Containers(t *testing.T) {
assertContains(cnrs, cnr)
require.NoError(t, metaInhume(db, object.AddressOf(obj), oidtest.ID()))
require.NoError(t, metaInhume(db, object.AddressOf(obj), oidtest.Address()))
cnrs, err = db.Containers(context.Background())
require.NoError(t, err)
@ -164,7 +164,7 @@ func TestDB_ContainerSize(t *testing.T) {
require.NoError(t, metaInhume(
db,
object.AddressOf(obj),
oidtest.ID(),
oidtest.Address(),
))
volume -= int(obj.PayloadSize())

View file

@ -41,7 +41,7 @@ func TestReset(t *testing.T) {
err = putBig(db, obj)
require.NoError(t, err)
err = metaInhume(db, addrToInhume, oidtest.ID())
err = metaInhume(db, addrToInhume, oidtest.Address())
require.NoError(t, err)
assertExists(addr, true, nil)

View file

@ -654,7 +654,7 @@ func (db *DB) DeleteContainerSize(ctx context.Context, id cid.ID) error {
return ErrReadOnlyMode
}
err := db.boltDB.Batch(func(tx *bbolt.Tx) error {
err := db.boltDB.Update(func(tx *bbolt.Tx) error {
b := tx.Bucket(containerVolumeBucketName)
key := make([]byte, cidSize)
@ -737,7 +737,7 @@ func (db *DB) DeleteContainerCount(ctx context.Context, id cid.ID) error {
return ErrReadOnlyMode
}
err := db.boltDB.Batch(func(tx *bbolt.Tx) error {
err := db.boltDB.Update(func(tx *bbolt.Tx) error {
b := tx.Bucket(containerCounterBucketName)
key := make([]byte, cidSize)

View file

@ -156,18 +156,13 @@ func TestCounters(t *testing.T) {
}
var prm meta.InhumePrm
for _, o := range inhumedObjs {
tombAddr := oidtest.Address()
tombAddr.SetContainer(o.Container())
prm.SetTombstoneAddress(oidtest.Address())
prm.SetAddresses(inhumedObjs...)
prm.SetTombstoneAddress(tombAddr)
prm.SetAddresses(o)
res, err := db.Inhume(context.Background(), prm)
require.NoError(t, err)
require.Equal(t, uint64(1), res.LogicInhumed())
require.Equal(t, uint64(1), res.UserInhumed())
}
res, err := db.Inhume(context.Background(), prm)
require.NoError(t, err)
require.Equal(t, uint64(len(inhumedObjs)), res.LogicInhumed())
require.Equal(t, uint64(len(inhumedObjs)), res.UserInhumed())
c, err := db.ObjectCounters()
require.NoError(t, err)
@ -301,16 +296,11 @@ func TestCounters(t *testing.T) {
}
var prm meta.InhumePrm
for _, o := range inhumedObjs {
tombAddr := oidtest.Address()
tombAddr.SetContainer(o.Container())
prm.SetTombstoneAddress(oidtest.Address())
prm.SetAddresses(inhumedObjs...)
prm.SetTombstoneAddress(tombAddr)
prm.SetAddresses(o)
_, err := db.Inhume(context.Background(), prm)
require.NoError(t, err)
}
_, err := db.Inhume(context.Background(), prm)
require.NoError(t, err)
c, err := db.ObjectCounters()
require.NoError(t, err)

View file

@ -77,6 +77,8 @@ func (p *DeletePrm) SetAddresses(addrs ...oid.Address) {
type referenceNumber struct {
all, cur int
addr oid.Address
obj *objectSDK.Object
}
@ -110,7 +112,7 @@ func (db *DB) Delete(ctx context.Context, prm DeletePrm) (DeleteRes, error) {
var err error
var res DeleteRes
err = db.boltDB.Batch(func(tx *bbolt.Tx) error {
err = db.boltDB.Update(func(tx *bbolt.Tx) error {
res, err = db.deleteGroup(tx, prm.addrs)
return err
})
@ -293,8 +295,9 @@ func (db *DB) delete(tx *bbolt.Tx, addr oid.Address, refCounter referenceCounter
nRef, ok := refCounter[k]
if !ok {
nRef = &referenceNumber{
all: parentLength(tx, parAddr),
obj: parent,
all: parentLength(tx, parAddr),
addr: parAddr,
obj: parent,
}
refCounter[k] = nRef

Some files were not shown because too many files have changed in this diff Show more