Compare commits

...

18 commits

Author SHA1 Message Date
f82b7e1ae3 [#1135] ir: Add healthstatus RECONFIGURING
All checks were successful
DCO action / DCO (pull_request) Successful in 1m57s
Vulncheck / Vulncheck (pull_request) Successful in 2m0s
Build / Build Components (1.21) (pull_request) Successful in 2m38s
Build / Build Components (1.22) (pull_request) Successful in 2m36s
Pre-commit hooks / Pre-commit (pull_request) Successful in 2m54s
Tests and linters / Tests (1.22) (pull_request) Successful in 2m48s
Tests and linters / Tests (1.21) (pull_request) Successful in 2m50s
Tests and linters / Tests with -race (pull_request) Successful in 2m53s
Tests and linters / Staticcheck (pull_request) Successful in 2m58s
Tests and linters / Lint (pull_request) Successful in 3m15s
Tests and linters / gopls check (pull_request) Successful in 3m33s
Signed-off-by: Ekaterina Lebedeva <ekaterina.lebedeva@yadro.com>
2024-08-15 18:24:10 +03:00
4c520be9f1 [#1313] blobovnicza: Prevent concurrent Put/Close
All checks were successful
Vulncheck / Vulncheck (pull_request) Successful in 1m59s
DCO action / DCO (pull_request) Successful in 1m53s
Build / Build Components (1.21) (pull_request) Successful in 2m20s
Build / Build Components (1.22) (pull_request) Successful in 2m26s
Pre-commit hooks / Pre-commit (pull_request) Successful in 2m56s
Tests and linters / Staticcheck (pull_request) Successful in 2m47s
Tests and linters / Tests (1.21) (pull_request) Successful in 2m56s
Tests and linters / Tests (1.22) (pull_request) Successful in 2m57s
Tests and linters / Lint (pull_request) Successful in 3m7s
Tests and linters / Tests with -race (pull_request) Successful in 3m7s
Tests and linters / gopls check (pull_request) Successful in 3m46s
Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2024-08-15 15:49:07 +03:00
8b4f3d82c9 [#1302] putSvc: Override SuccessAfter for non-regular objects in EC containers
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
2024-08-09 10:06:18 +00:00
078b5fdd6e [#1302] writecache: Allow to specify custom page size
All checks were successful
DCO action / DCO (pull_request) Successful in 2m44s
Vulncheck / Vulncheck (pull_request) Successful in 2m49s
Pre-commit hooks / Pre-commit (pull_request) Successful in 3m31s
Build / Build Components (1.22) (pull_request) Successful in 3m23s
Build / Build Components (1.21) (pull_request) Successful in 3m25s
Tests and linters / Tests (1.21) (pull_request) Successful in 3m41s
Tests and linters / Tests (1.22) (pull_request) Successful in 3m41s
Tests and linters / Tests with -race (pull_request) Successful in 3m49s
Tests and linters / Staticcheck (pull_request) Successful in 3m52s
Tests and linters / Lint (pull_request) Successful in 4m15s
Tests and linters / gopls check (pull_request) Successful in 4m19s
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
2024-08-09 12:39:11 +03:00
b3ab9589a5 [#1302] writecache: Add put->flush->put benchmark
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
2024-08-09 12:35:05 +03:00
54862250b2 [#1295] getSvc: Assemble complex EC object headers without linking object
All checks were successful
DCO action / DCO (pull_request) Successful in 1m30s
Vulncheck / Vulncheck (pull_request) Successful in 1m46s
Build / Build Components (1.22) (pull_request) Successful in 2m18s
Build / Build Components (1.21) (pull_request) Successful in 2m27s
Pre-commit hooks / Pre-commit (pull_request) Successful in 2m46s
Tests and linters / Tests (1.22) (pull_request) Successful in 2m46s
Tests and linters / Tests (1.21) (pull_request) Successful in 2m48s
Tests and linters / Staticcheck (pull_request) Successful in 2m42s
Tests and linters / Tests with -race (pull_request) Successful in 2m51s
Tests and linters / Lint (pull_request) Successful in 3m9s
Tests and linters / gopls check (pull_request) Successful in 3m20s
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
2024-08-06 17:51:23 +03:00
89d670c2f0 [#1295] engine: Resolve funlen linter
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
2024-08-06 17:51:06 +03:00
87c5954f4e [#1295] engine: Log object address in case of error
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
2024-08-06 17:50:48 +03:00
28fc41bd98 [#1291] morph: Reconnect to the highest priority endpoint
All checks were successful
DCO action / DCO (pull_request) Successful in 1m50s
Vulncheck / Vulncheck (pull_request) Successful in 2m7s
Build / Build Components (1.21) (pull_request) Successful in 2m28s
Build / Build Components (1.22) (pull_request) Successful in 2m25s
Pre-commit hooks / Pre-commit (pull_request) Successful in 3m4s
Tests and linters / Tests (1.22) (pull_request) Successful in 3m7s
Tests and linters / Staticcheck (pull_request) Successful in 3m0s
Tests and linters / Lint (pull_request) Successful in 3m16s
Tests and linters / Tests with -race (pull_request) Successful in 3m15s
Tests and linters / Tests (1.21) (pull_request) Successful in 3m30s
Tests and linters / gopls check (pull_request) Successful in 4m10s
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
2024-08-05 13:17:44 +03:00
20f308876c [#1288] putSvc: Respect TTL for EC put
All checks were successful
DCO action / DCO (pull_request) Successful in 3m6s
Vulncheck / Vulncheck (pull_request) Successful in 3m10s
Tests and linters / Tests (1.22) (pull_request) Successful in 3m33s
Build / Build Components (1.22) (pull_request) Successful in 3m47s
Build / Build Components (1.21) (pull_request) Successful in 3m51s
Tests and linters / Staticcheck (pull_request) Successful in 4m0s
Tests and linters / Tests with -race (pull_request) Successful in 4m5s
Tests and linters / Lint (pull_request) Successful in 4m18s
Tests and linters / gopls check (pull_request) Successful in 5m14s
Tests and linters / Tests (1.21) (pull_request) Successful in 1m57s
Pre-commit hooks / Pre-commit (pull_request) Successful in 57s
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
2024-08-02 17:55:54 +03:00
02d191e9a6 [#1279] adm: Interpret "root" name as empty for namespace target type
All checks were successful
DCO action / DCO (pull_request) Successful in 1m35s
Vulncheck / Vulncheck (pull_request) Successful in 1m57s
Tests and linters / Tests (1.22) (pull_request) Successful in 2m11s
Build / Build Components (1.22) (pull_request) Successful in 2m26s
Build / Build Components (1.21) (pull_request) Successful in 2m32s
Pre-commit hooks / Pre-commit (pull_request) Successful in 2m30s
Tests and linters / Staticcheck (pull_request) Successful in 2m45s
Tests and linters / Tests with -race (pull_request) Successful in 2m54s
Tests and linters / Lint (pull_request) Successful in 3m8s
Tests and linters / gopls check (pull_request) Successful in 4m16s
Tests and linters / Tests (1.21) (pull_request) Successful in 1m57s
Signed-off-by: Airat Arifullin <a.arifullin@yadro.com>
2024-08-01 16:11:44 +03:00
cecb3f7867 [#1282] cli: Allow to external addresses first for object nodes
All checks were successful
DCO action / DCO (pull_request) Successful in 1m12s
Build / Build Components (1.22) (pull_request) Successful in 2m14s
Build / Build Components (1.21) (pull_request) Successful in 2m14s
Vulncheck / Vulncheck (pull_request) Successful in 1m56s
Pre-commit hooks / Pre-commit (pull_request) Successful in 2m44s
Tests and linters / Staticcheck (pull_request) Successful in 2m49s
Tests and linters / Tests (1.22) (pull_request) Successful in 2m53s
Tests and linters / Lint (pull_request) Successful in 3m15s
Tests and linters / Tests with -race (pull_request) Successful in 3m22s
Tests and linters / gopls check (pull_request) Successful in 3m34s
Tests and linters / Tests (1.21) (pull_request) Successful in 1m58s
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
2024-08-01 09:18:18 +03:00
f53d30fa95 [#1278] containerSvc: Validate FrostFSID subject exitence on Put
All checks were successful
Build / Build Components (1.21) (pull_request) Successful in 1m37s
Build / Build Components (1.22) (pull_request) Successful in 2m49s
Vulncheck / Vulncheck (pull_request) Successful in 2m25s
Tests and linters / Tests with -race (pull_request) Successful in 5m21s
Tests and linters / Tests (1.22) (pull_request) Successful in 5m48s
DCO action / DCO (pull_request) Successful in 31s
Pre-commit hooks / Pre-commit (pull_request) Successful in 1m12s
Tests and linters / Staticcheck (pull_request) Successful in 1m44s
Tests and linters / Lint (pull_request) Successful in 2m20s
Tests and linters / gopls check (pull_request) Successful in 2m33s
Tests and linters / Tests (1.21) (pull_request) Successful in 3m36s
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
2024-07-29 16:05:10 +03:00
1b92817bd3 [#1278] ir: Do not allow to create container without FrostFSID record
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
2024-07-29 16:04:53 +03:00
32ec421ac7 [#1277] go.mod: Update api-go
All checks were successful
DCO action / DCO (pull_request) Successful in 1m24s
Vulncheck / Vulncheck (pull_request) Successful in 1m26s
Build / Build Components (1.21) (pull_request) Successful in 2m17s
Build / Build Components (1.22) (pull_request) Successful in 2m9s
Tests and linters / Staticcheck (pull_request) Successful in 3m1s
Tests and linters / gopls check (pull_request) Successful in 3m29s
Tests and linters / Lint (pull_request) Successful in 4m16s
Pre-commit hooks / Pre-commit (pull_request) Successful in 5m27s
Tests and linters / Tests with -race (pull_request) Successful in 7m5s
Tests and linters / Tests (1.21) (pull_request) Successful in 8m1s
Tests and linters / Tests (1.22) (pull_request) Successful in 8m2s
Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2024-07-26 17:27:41 +03:00
4d102b05e5 [#1274] go.mod: Update neo-go version that fixes ws-client
All checks were successful
DCO action / DCO (pull_request) Successful in 1m17s
Build / Build Components (1.21) (pull_request) Successful in 1m59s
Build / Build Components (1.22) (pull_request) Successful in 2m9s
Vulncheck / Vulncheck (pull_request) Successful in 1m45s
Tests and linters / Staticcheck (pull_request) Successful in 2m45s
Tests and linters / gopls check (pull_request) Successful in 3m18s
Pre-commit hooks / Pre-commit (pull_request) Successful in 5m2s
Tests and linters / Lint (pull_request) Successful in 4m58s
Tests and linters / Tests with -race (pull_request) Successful in 5m4s
Tests and linters / Tests (1.21) (pull_request) Successful in 7m46s
Tests and linters / Tests (1.22) (pull_request) Successful in 7m48s
* Update go.mod;
* This neo-go package version contains fix for the wsclient that
  allows to morph event listener refresh the invalidated websocket
  connection to neo-go.

Signed-off-by: Airat Arifullin <a.arifullin@yadro.com>
2024-07-26 15:52:03 +03:00
308da7cb01 [#1271] getSvc: Fix local EC chunk get
All checks were successful
DCO action / DCO (pull_request) Successful in 1m2s
Vulncheck / Vulncheck (pull_request) Successful in 1m22s
Build / Build Components (1.22) (pull_request) Successful in 2m36s
Build / Build Components (1.21) (pull_request) Successful in 2m36s
Tests and linters / Staticcheck (pull_request) Successful in 3m19s
Tests and linters / gopls check (pull_request) Successful in 3m27s
Tests and linters / Lint (pull_request) Successful in 4m22s
Pre-commit hooks / Pre-commit (pull_request) Successful in 5m14s
Tests and linters / Tests (1.21) (pull_request) Successful in 8m12s
Tests and linters / Tests (1.22) (pull_request) Successful in 8m14s
Tests and linters / Tests with -race (pull_request) Successful in 8m29s
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
2024-07-24 10:23:35 +03:00
37b83c0856 [#1271] getSvc: Fix head --raw assemble for EC
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
2024-07-24 10:23:15 +03:00
68 changed files with 320 additions and 556 deletions

View file

@ -38,6 +38,12 @@ var (
func parseTarget(cmd *cobra.Command) policyengine.Target { func parseTarget(cmd *cobra.Command) policyengine.Target {
name, _ := cmd.Flags().GetString(targetNameFlag) name, _ := cmd.Flags().GetString(targetNameFlag)
typ, err := parseTargetType(cmd) typ, err := parseTargetType(cmd)
// interpret "root" namespace as empty
if typ == policyengine.Namespace && name == "root" {
name = ""
}
commonCmd.ExitOnErr(cmd, "read target type error: %w", err) commonCmd.ExitOnErr(cmd, "read target type error: %w", err)
return policyengine.Target{ return policyengine.Target{

View file

@ -214,29 +214,6 @@ func EACL(ctx context.Context, prm EACLPrm) (res EACLRes, err error) {
return return
} }
// SetEACLPrm groups parameters of SetEACL operation.
type SetEACLPrm struct {
Client *client.Client
ClientParams client.PrmContainerSetEACL
}
// SetEACLRes groups the resulting values of SetEACL operation.
type SetEACLRes struct{}
// SetEACL requests to save an eACL table in FrostFS.
//
// Operation is asynchronous and no guaranteed even in the absence of errors.
// The required time is also not predictable.
//
// Success can be verified by reading by container identifier.
//
// Returns any error which prevented the operation from completing correctly in error return.
func SetEACL(ctx context.Context, prm SetEACLPrm) (res SetEACLRes, err error) {
_, err = prm.Client.ContainerSetEACL(ctx, prm.ClientParams)
return
}
// NetworkInfoPrm groups parameters of NetworkInfo operation. // NetworkInfoPrm groups parameters of NetworkInfo operation.
type NetworkInfoPrm struct { type NetworkInfoPrm struct {
Client *client.Client Client *client.Client

View file

@ -26,7 +26,6 @@ func init() {
listContainerObjectsCmd, listContainerObjectsCmd,
getContainerInfoCmd, getContainerInfoCmd,
getExtendedACLCmd, getExtendedACLCmd,
setExtendedACLCmd,
containerNodesCmd, containerNodesCmd,
policyPlaygroundCmd, policyPlaygroundCmd,
} }
@ -39,7 +38,6 @@ func init() {
initContainerListObjectsCmd() initContainerListObjectsCmd()
initContainerInfoCmd() initContainerInfoCmd()
initContainerGetEACLCmd() initContainerGetEACLCmd()
initContainerSetEACLCmd()
initContainerNodesCmd() initContainerNodesCmd()
initContainerPolicyPlaygroundCmd() initContainerPolicyPlaygroundCmd()
@ -53,7 +51,6 @@ func init() {
}{ }{
{createContainerCmd, "PUT"}, {createContainerCmd, "PUT"},
{deleteContainerCmd, "DELETE"}, {deleteContainerCmd, "DELETE"},
{setExtendedACLCmd, "SETEACL"},
} { } {
commonflags.InitSession(el.cmd, "container "+el.verb) commonflags.InitSession(el.cmd, "container "+el.verb)
} }

View file

@ -1,108 +0,0 @@
package container
import (
"bytes"
"errors"
"time"
internalclient "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/client"
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/common"
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/commonflags"
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/key"
commonCmd "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/internal/common"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
"github.com/spf13/cobra"
)
var flagVarsSetEACL struct {
noPreCheck bool
srcPath string
}
var setExtendedACLCmd = &cobra.Command{
Use: "set-eacl",
Short: "Set new extended ACL table for container",
Long: `Set new extended ACL table for container.
Container ID in EACL table will be substituted with ID from the CLI.`,
Run: func(cmd *cobra.Command, _ []string) {
id := parseContainerID(cmd)
eaclTable := common.ReadEACL(cmd, flagVarsSetEACL.srcPath)
tok := getSession(cmd)
eaclTable.SetCID(id)
pk := key.GetOrGenerate(cmd)
cli := internalclient.GetSDKClientByFlag(cmd, pk, commonflags.RPC)
if !flagVarsSetEACL.noPreCheck {
cmd.Println("Checking the ability to modify access rights in the container...")
extendable, err := internalclient.IsACLExtendable(cmd.Context(), cli, id)
commonCmd.ExitOnErr(cmd, "Extensibility check failure: %w", err)
if !extendable {
commonCmd.ExitOnErr(cmd, "", errors.New("container ACL is immutable"))
}
cmd.Println("ACL extension is enabled in the container, continue processing.")
}
setEACLPrm := internalclient.SetEACLPrm{
Client: cli,
ClientParams: client.PrmContainerSetEACL{
Table: eaclTable,
Session: tok,
},
}
_, err := internalclient.SetEACL(cmd.Context(), setEACLPrm)
commonCmd.ExitOnErr(cmd, "rpc error: %w", err)
if containerAwait {
exp, err := eaclTable.Marshal()
commonCmd.ExitOnErr(cmd, "broken EACL table: %w", err)
cmd.Println("awaiting...")
getEACLPrm := internalclient.EACLPrm{
Client: cli,
ClientParams: client.PrmContainerEACL{
ContainerID: &id,
},
}
for i := 0; i < awaitTimeout; i++ {
time.Sleep(1 * time.Second)
res, err := internalclient.EACL(cmd.Context(), getEACLPrm)
if err == nil {
// compare binary values because EACL could have been set already
table := res.EACL()
got, err := table.Marshal()
if err != nil {
continue
}
if bytes.Equal(exp, got) {
cmd.Println("EACL has been persisted on sidechain")
return
}
}
}
commonCmd.ExitOnErr(cmd, "", errSetEACLTimeout)
}
},
}
func initContainerSetEACLCmd() {
commonflags.Init(setExtendedACLCmd)
flags := setExtendedACLCmd.Flags()
flags.StringVar(&containerID, commonflags.CIDFlag, "", commonflags.CIDFlagUsage)
flags.StringVar(&flagVarsSetEACL.srcPath, "table", "", "path to file with JSON or binary encoded EACL table")
flags.BoolVar(&containerAwait, "await", false, "block execution until EACL is persisted")
flags.BoolVar(&flagVarsSetEACL.noPreCheck, "no-precheck", false, "do not pre-check the extensibility of the container ACL")
}

View file

@ -18,9 +18,8 @@ const (
) )
var ( var (
errCreateTimeout = errors.New("timeout: container has not been persisted on sidechain") errCreateTimeout = errors.New("timeout: container has not been persisted on sidechain")
errDeleteTimeout = errors.New("timeout: container has not been removed from sidechain") errDeleteTimeout = errors.New("timeout: container has not been removed from sidechain")
errSetEACLTimeout = errors.New("timeout: EACL has not been persisted on sidechain")
) )
func parseContainerID(cmd *cobra.Command) cid.ID { func parseContainerID(cmd *cobra.Command) cid.ID {

View file

@ -30,7 +30,8 @@ import (
) )
const ( const (
verifyPresenceAllFlag = "verify-presence-all" verifyPresenceAllFlag = "verify-presence-all"
preferInternalAddressesFlag = "prefer-internal-addresses"
) )
var ( var (
@ -97,6 +98,7 @@ func initObjectNodesCmd() {
flags.Bool(verifyPresenceAllFlag, false, "Verify the actual presence of the object on all netmap nodes.") flags.Bool(verifyPresenceAllFlag, false, "Verify the actual presence of the object on all netmap nodes.")
flags.Bool(commonflags.JSON, false, "Print information about the object placement as json.") flags.Bool(commonflags.JSON, false, "Print information about the object placement as json.")
flags.Bool(preferInternalAddressesFlag, false, "Use internal addresses first to get object info.")
} }
func objectNodes(cmd *cobra.Command, _ []string) { func objectNodes(cmd *cobra.Command, _ []string) {
@ -449,11 +451,20 @@ func getNodesToCheckObjectExistance(cmd *cobra.Command, netmap *netmapSDK.NetMap
func createClient(ctx context.Context, cmd *cobra.Command, candidate netmapSDK.NodeInfo, pk *ecdsa.PrivateKey) (*client.Client, error) { func createClient(ctx context.Context, cmd *cobra.Command, candidate netmapSDK.NodeInfo, pk *ecdsa.PrivateKey) (*client.Client, error) {
var cli *client.Client var cli *client.Client
var addresses []string var addresses []string
candidate.IterateNetworkEndpoints(func(s string) bool { if preferInternal, _ := cmd.Flags().GetBool(preferInternalAddressesFlag); preferInternal {
addresses = append(addresses, s) candidate.IterateNetworkEndpoints(func(s string) bool {
return false addresses = append(addresses, s)
}) return false
addresses = append(addresses, candidate.ExternalAddresses()...) })
addresses = append(addresses, candidate.ExternalAddresses()...)
} else {
addresses = append(addresses, candidate.ExternalAddresses()...)
candidate.IterateNetworkEndpoints(func(s string) bool {
addresses = append(addresses, s)
return false
})
}
var lastErr error var lastErr error
for _, address := range addresses { for _, address := range addresses {
var networkAddr network.Address var networkAddr network.Address

View file

@ -7,6 +7,7 @@ import (
configViper "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/internal/common/config" configViper "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/internal/common/config"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
control "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/control/ir"
"github.com/spf13/viper" "github.com/spf13/viper"
"go.uber.org/zap" "go.uber.org/zap"
) )
@ -81,6 +82,10 @@ func watchForSignal(cancel func()) {
return return
case <-sighupCh: case <-sighupCh:
log.Info(logs.FrostFSNodeSIGHUPHasBeenReceivedRereadingConfiguration) log.Info(logs.FrostFSNodeSIGHUPHasBeenReceivedRereadingConfiguration)
if !innerRing.CompareAndSwapHealthStatus(control.HealthStatus_READY, control.HealthStatus_RECONFIGURING) {
log.Info(logs.FrostFSNodeSIGHUPSkip)
break
}
err := reloadConfig() err := reloadConfig()
if err != nil { if err != nil {
log.Error(logs.FrostFSNodeConfigurationReading, zap.Error(err)) log.Error(logs.FrostFSNodeConfigurationReading, zap.Error(err))
@ -92,6 +97,7 @@ func watchForSignal(cancel func()) {
if err != nil { if err != nil {
log.Error(logs.FrostFSNodeConfigurationReading, zap.Error(err)) log.Error(logs.FrostFSNodeConfigurationReading, zap.Error(err))
} }
innerRing.CompareAndSwapHealthStatus(control.HealthStatus_RECONFIGURING, control.HealthStatus_READY)
log.Info(logs.FrostFSNodeConfigurationHasBeenReloadedSuccessfully) log.Info(logs.FrostFSNodeConfigurationHasBeenReloadedSuccessfully)
} }
} }

View file

@ -25,7 +25,7 @@ func init() {
func inspectFunc(cmd *cobra.Command, _ []string) { func inspectFunc(cmd *cobra.Command, _ []string) {
var data []byte var data []byte
db, err := writecache.OpenDB(vPath, true, os.OpenFile) db, err := writecache.OpenDB(vPath, true, os.OpenFile, 0)
common.ExitOnErr(cmd, common.Errf("could not open write-cache db: %w", err)) common.ExitOnErr(cmd, common.Errf("could not open write-cache db: %w", err))
defer db.Close() defer db.Close()

View file

@ -31,7 +31,7 @@ func listFunc(cmd *cobra.Command, _ []string) {
return err return err
} }
db, err := writecache.OpenDB(vPath, true, os.OpenFile) db, err := writecache.OpenDB(vPath, true, os.OpenFile, 0)
common.ExitOnErr(cmd, common.Errf("could not open write-cache db: %w", err)) common.ExitOnErr(cmd, common.Errf("could not open write-cache db: %w", err))
defer db.Close() defer db.Close()

View file

@ -153,6 +153,7 @@ type shardCfg struct {
flushWorkerCount int flushWorkerCount int
sizeLimit uint64 sizeLimit uint64
noSync bool noSync bool
pageSize int
} }
piloramaCfg struct { piloramaCfg struct {
@ -271,6 +272,7 @@ func (a *applicationConfiguration) setShardWriteCacheConfig(newConfig *shardCfg,
wc.path = writeCacheCfg.Path() wc.path = writeCacheCfg.Path()
wc.maxBatchSize = writeCacheCfg.BoltDB().MaxBatchSize() wc.maxBatchSize = writeCacheCfg.BoltDB().MaxBatchSize()
wc.maxBatchDelay = writeCacheCfg.BoltDB().MaxBatchDelay() wc.maxBatchDelay = writeCacheCfg.BoltDB().MaxBatchDelay()
wc.pageSize = writeCacheCfg.BoltDB().PageSize()
wc.maxObjSize = writeCacheCfg.MaxObjectSize() wc.maxObjSize = writeCacheCfg.MaxObjectSize()
wc.smallObjectSize = writeCacheCfg.SmallObjectSize() wc.smallObjectSize = writeCacheCfg.SmallObjectSize()
wc.flushWorkerCount = writeCacheCfg.WorkerCount() wc.flushWorkerCount = writeCacheCfg.WorkerCount()
@ -863,6 +865,7 @@ func (c *cfg) getWriteCacheOpts(shCfg shardCfg) []writecache.Option {
writecache.WithPath(wcRead.path), writecache.WithPath(wcRead.path),
writecache.WithMaxBatchSize(wcRead.maxBatchSize), writecache.WithMaxBatchSize(wcRead.maxBatchSize),
writecache.WithMaxBatchDelay(wcRead.maxBatchDelay), writecache.WithMaxBatchDelay(wcRead.maxBatchDelay),
writecache.WithPageSize(wcRead.pageSize),
writecache.WithMaxObjectSize(wcRead.maxObjSize), writecache.WithMaxObjectSize(wcRead.maxObjSize),
writecache.WithSmallObjectSize(wcRead.smallObjectSize), writecache.WithSmallObjectSize(wcRead.smallObjectSize),
writecache.WithFlushWorkersCount(wcRead.flushWorkerCount), writecache.WithFlushWorkersCount(wcRead.flushWorkerCount),

View file

@ -78,6 +78,7 @@ func TestEngineSection(t *testing.T) {
require.EqualValues(t, 134217728, wc.MaxObjectSize()) require.EqualValues(t, 134217728, wc.MaxObjectSize())
require.EqualValues(t, 30, wc.WorkerCount()) require.EqualValues(t, 30, wc.WorkerCount())
require.EqualValues(t, 3221225472, wc.SizeLimit()) require.EqualValues(t, 3221225472, wc.SizeLimit())
require.EqualValues(t, 4096, wc.BoltDB().PageSize())
require.Equal(t, "tmp/0/meta", meta.Path()) require.Equal(t, "tmp/0/meta", meta.Path())
require.Equal(t, fs.FileMode(0o644), meta.BoltDB().Perm()) require.Equal(t, fs.FileMode(0o644), meta.BoltDB().Perm())
@ -133,6 +134,7 @@ func TestEngineSection(t *testing.T) {
require.EqualValues(t, 134217728, wc.MaxObjectSize()) require.EqualValues(t, 134217728, wc.MaxObjectSize())
require.EqualValues(t, 30, wc.WorkerCount()) require.EqualValues(t, 30, wc.WorkerCount())
require.EqualValues(t, 4294967296, wc.SizeLimit()) require.EqualValues(t, 4294967296, wc.SizeLimit())
require.EqualValues(t, 0, wc.BoltDB().PageSize())
require.Equal(t, "tmp/1/meta", meta.Path()) require.Equal(t, "tmp/1/meta", meta.Path())
require.Equal(t, fs.FileMode(0o644), meta.BoltDB().Perm()) require.Equal(t, fs.FileMode(0o644), meta.BoltDB().Perm())

View file

@ -60,3 +60,14 @@ func (x *Config) MaxBatchSize() int {
func (x *Config) NoSync() bool { func (x *Config) NoSync() bool {
return config.BoolSafe((*config.Config)(x), "no_sync") return config.BoolSafe((*config.Config)(x), "no_sync")
} }
// PageSize returns the value of "page_size" config parameter.
//
// Returns 0 if the value is not a positive number.
func (x *Config) PageSize() int {
s := int(config.SizeInBytesSafe((*config.Config)(x), "page_size"))
if s < 0 {
s = 0
}
return s
}

View file

@ -105,6 +105,7 @@ FROSTFS_STORAGE_SHARD_0_WRITECACHE_SMALL_OBJECT_SIZE=16384
FROSTFS_STORAGE_SHARD_0_WRITECACHE_MAX_OBJECT_SIZE=134217728 FROSTFS_STORAGE_SHARD_0_WRITECACHE_MAX_OBJECT_SIZE=134217728
FROSTFS_STORAGE_SHARD_0_WRITECACHE_FLUSH_WORKER_COUNT=30 FROSTFS_STORAGE_SHARD_0_WRITECACHE_FLUSH_WORKER_COUNT=30
FROSTFS_STORAGE_SHARD_0_WRITECACHE_CAPACITY=3221225472 FROSTFS_STORAGE_SHARD_0_WRITECACHE_CAPACITY=3221225472
FROSTFS_STORAGE_SHARD_0_WRITECACHE_PAGE_SIZE=4096
### Metabase config ### Metabase config
FROSTFS_STORAGE_SHARD_0_METABASE_PATH=tmp/0/meta FROSTFS_STORAGE_SHARD_0_METABASE_PATH=tmp/0/meta
FROSTFS_STORAGE_SHARD_0_METABASE_PERM=0644 FROSTFS_STORAGE_SHARD_0_METABASE_PERM=0644

View file

@ -148,7 +148,8 @@
"small_object_size": 16384, "small_object_size": 16384,
"max_object_size": 134217728, "max_object_size": 134217728,
"flush_worker_count": 30, "flush_worker_count": 30,
"capacity": 3221225472 "capacity": 3221225472,
"page_size": 4096
}, },
"metabase": { "metabase": {
"path": "tmp/0/meta", "path": "tmp/0/meta",

View file

@ -171,6 +171,7 @@ storage:
no_sync: true no_sync: true
path: tmp/0/cache # write-cache root directory path: tmp/0/cache # write-cache root directory
capacity: 3221225472 # approximate write-cache total size, bytes capacity: 3221225472 # approximate write-cache total size, bytes
page_size: 4k
metabase: metabase:
path: tmp/0/meta # metabase path path: tmp/0/meta # metabase path

View file

@ -290,6 +290,7 @@ writecache:
small_object_size: 16384 small_object_size: 16384
max_object_size: 134217728 max_object_size: 134217728
flush_worker_count: 30 flush_worker_count: 30
page_size: '4k'
``` ```
| Parameter | Type | Default value | Description | | Parameter | Type | Default value | Description |
@ -301,6 +302,7 @@ writecache:
| `flush_worker_count` | `int` | `20` | Amount of background workers that move data from the writecache to the blobstor. | | `flush_worker_count` | `int` | `20` | Amount of background workers that move data from the writecache to the blobstor. |
| `max_batch_size` | `int` | `1000` | Maximum amount of small object `PUT` operations to perform in a single transaction. | | `max_batch_size` | `int` | `1000` | Maximum amount of small object `PUT` operations to perform in a single transaction. |
| `max_batch_delay` | `duration` | `10ms` | Maximum delay before a batch starts. | | `max_batch_delay` | `duration` | `10ms` | Maximum delay before a batch starts. |
| `page_size` | `size` | `0` | Page size overrides the default OS page size for small objects storage. Does not affect the existing storage. |
# `node` section # `node` section

10
go.mod
View file

@ -4,11 +4,11 @@ go 1.21
require ( require (
code.gitea.io/sdk/gitea v0.17.1 code.gitea.io/sdk/gitea v0.17.1
git.frostfs.info/TrueCloudLab/frostfs-api-go/v2 v2.16.1-0.20240530152826-2f6d3209e1d3 git.frostfs.info/TrueCloudLab/frostfs-api-go/v2 v2.16.1-0.20240726072425-3dfa2f4fd65e
git.frostfs.info/TrueCloudLab/frostfs-contract v0.19.3-0.20240621131249-49e5270f673e git.frostfs.info/TrueCloudLab/frostfs-contract v0.19.3-0.20240621131249-49e5270f673e
git.frostfs.info/TrueCloudLab/frostfs-crypto v0.6.0 git.frostfs.info/TrueCloudLab/frostfs-crypto v0.6.0
git.frostfs.info/TrueCloudLab/frostfs-observability v0.0.0-20231101111734-b3ad3335ff65 git.frostfs.info/TrueCloudLab/frostfs-observability v0.0.0-20231101111734-b3ad3335ff65
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20240617140730-1a5886e776de git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20240726111349-9da46f566fec
git.frostfs.info/TrueCloudLab/hrw v1.2.1 git.frostfs.info/TrueCloudLab/hrw v1.2.1
git.frostfs.info/TrueCloudLab/policy-engine v0.0.0-20240712081403-2628f6184984 git.frostfs.info/TrueCloudLab/policy-engine v0.0.0-20240712081403-2628f6184984
git.frostfs.info/TrueCloudLab/tzhash v1.8.0 git.frostfs.info/TrueCloudLab/tzhash v1.8.0
@ -23,7 +23,7 @@ require (
github.com/mitchellh/go-homedir v1.1.0 github.com/mitchellh/go-homedir v1.1.0
github.com/mr-tron/base58 v1.2.0 github.com/mr-tron/base58 v1.2.0
github.com/multiformats/go-multiaddr v0.12.1 github.com/multiformats/go-multiaddr v0.12.1
github.com/nspcc-dev/neo-go v0.106.0 github.com/nspcc-dev/neo-go v0.106.2
github.com/olekukonko/tablewriter v0.0.5 github.com/olekukonko/tablewriter v0.0.5
github.com/panjf2000/ants/v2 v2.9.0 github.com/panjf2000/ants/v2 v2.9.0
github.com/paulmach/orb v0.11.0 github.com/paulmach/orb v0.11.0
@ -38,7 +38,7 @@ require (
go.opentelemetry.io/otel v1.22.0 go.opentelemetry.io/otel v1.22.0
go.opentelemetry.io/otel/trace v1.22.0 go.opentelemetry.io/otel/trace v1.22.0
go.uber.org/zap v1.27.0 go.uber.org/zap v1.27.0
golang.org/x/exp v0.0.0-20240119083558-1b970713d09a golang.org/x/exp v0.0.0-20240222234643-814bf88cf225
golang.org/x/sync v0.6.0 golang.org/x/sync v0.6.0
golang.org/x/sys v0.18.0 golang.org/x/sys v0.18.0
golang.org/x/term v0.18.0 golang.org/x/term v0.18.0
@ -128,4 +128,4 @@ require (
rsc.io/tmplfunc v0.0.3 // indirect rsc.io/tmplfunc v0.0.3 // indirect
) )
replace github.com/nspcc-dev/neo-go => git.frostfs.info/TrueCloudLab/neoneo-go v0.106.1-0.20240611123832-594f716b3d18 replace github.com/nspcc-dev/neo-go => git.frostfs.info/TrueCloudLab/neoneo-go v0.106.1-0.20240726093631-5481339d6928

BIN
go.sum

Binary file not shown.

View file

@ -11,7 +11,6 @@ import (
// Client is an interface of FrostFS storage // Client is an interface of FrostFS storage
// node's client. // node's client.
type Client interface { type Client interface {
ContainerAnnounceUsedSpace(context.Context, client.PrmAnnounceSpace) (*client.ResAnnounceSpace, error)
ObjectPutInit(context.Context, client.PrmObjectPutInit) (client.ObjectWriter, error) ObjectPutInit(context.Context, client.PrmObjectPutInit) (client.ObjectWriter, error)
ObjectPutSingle(context.Context, client.PrmObjectPutSingle) (*client.ResObjectPutSingle, error) ObjectPutSingle(context.Context, client.PrmObjectPutSingle) (*client.ResObjectPutSingle, error)
ObjectDelete(context.Context, client.PrmObjectDelete) (*client.ResObjectDelete, error) ObjectDelete(context.Context, client.PrmObjectDelete) (*client.ResObjectDelete, error)

View file

@ -3,7 +3,6 @@ package container
import ( import (
"crypto/ecdsa" "crypto/ecdsa"
"encoding/hex" "encoding/hex"
"fmt"
"testing" "testing"
"time" "time"
@ -238,5 +237,5 @@ func (c *testMorphClient) NotarySignAndInvokeTX(mainTx *transaction.Transaction)
type testFrostFSIDClient struct{} type testFrostFSIDClient struct{}
func (c *testFrostFSIDClient) GetSubject(addr util.Uint160) (*frostfsidclient.Subject, error) { func (c *testFrostFSIDClient) GetSubject(addr util.Uint160) (*frostfsidclient.Subject, error) {
return nil, fmt.Errorf("subject not found") return &frostfsidclient.Subject{}, nil
} }

View file

@ -180,11 +180,6 @@ func (cp *Processor) checkNNS(ctx *putContainerContext, cnr containerSDK.Contain
} }
} }
namespace, hasNamespace := strings.CutSuffix(ctx.d.Zone(), ".ns")
if !hasNamespace {
return nil
}
addr, err := util.Uint160DecodeBytesBE(cnr.Owner().WalletBytes()[1 : 1+util.Uint160Size]) addr, err := util.Uint160DecodeBytesBE(cnr.Owner().WalletBytes()[1 : 1+util.Uint160Size])
if err != nil { if err != nil {
return fmt.Errorf("could not get container owner address: %w", err) return fmt.Errorf("could not get container owner address: %w", err)
@ -195,6 +190,11 @@ func (cp *Processor) checkNNS(ctx *putContainerContext, cnr containerSDK.Contain
return fmt.Errorf("could not get subject from FrostfsID contract: %w", err) return fmt.Errorf("could not get subject from FrostfsID contract: %w", err)
} }
namespace, hasNamespace := strings.CutSuffix(ctx.d.Zone(), ".ns")
if !hasNamespace {
return nil
}
if subject.Namespace != namespace { if subject.Namespace != namespace {
return errContainerAndOwnerNamespaceDontMatch return errContainerAndOwnerNamespaceDontMatch
} }

View file

@ -161,6 +161,16 @@ func (s *Server) setHealthStatus(hs control.HealthStatus) {
} }
} }
func (s *Server) CompareAndSwapHealthStatus(oldSt, newSt control.HealthStatus) (swapped bool) {
if swapped = s.healthStatus.CompareAndSwap(int32(oldSt), int32(newSt)); swapped {
s.notifySystemd(newSt)
if s.irMetrics != nil {
s.irMetrics.SetHealth(int32(newSt))
}
}
return
}
// HealthStatus returns the current health status of the IR application. // HealthStatus returns the current health status of the IR application.
func (s *Server) HealthStatus() control.HealthStatus { func (s *Server) HealthStatus() control.HealthStatus {
return control.HealthStatus(s.healthStatus.Load()) return control.HealthStatus(s.healthStatus.Load())
@ -186,6 +196,8 @@ func (s *Server) notifySystemd(st control.HealthStatus) {
err = sdnotify.FlagAndStatus(sdnotify.ReadyEnabled) err = sdnotify.FlagAndStatus(sdnotify.ReadyEnabled)
case control.HealthStatus_SHUTTING_DOWN: case control.HealthStatus_SHUTTING_DOWN:
err = sdnotify.FlagAndStatus(sdnotify.StoppingEnabled) err = sdnotify.FlagAndStatus(sdnotify.StoppingEnabled)
case control.HealthStatus_RECONFIGURING:
err = sdnotify.FlagAndStatus(sdnotify.ReloadingEnabled)
default: default:
err = sdnotify.Status(fmt.Sprintf("%v", st)) err = sdnotify.Status(fmt.Sprintf("%v", st))
} }

View file

@ -22,7 +22,7 @@ type Blobovnicza struct {
boltDB *bbolt.DB boltDB *bbolt.DB
opened bool opened bool
controlMtx sync.Mutex controlMtx sync.RWMutex
} }
// Option is an option of Blobovnicza's constructor. // Option is an option of Blobovnicza's constructor.

View file

@ -45,6 +45,9 @@ func (b *Blobovnicza) Delete(ctx context.Context, prm DeletePrm) (DeleteRes, err
)) ))
defer span.End() defer span.End()
b.controlMtx.RLock()
defer b.controlMtx.RUnlock()
addrKey := addressKey(prm.addr) addrKey := addressKey(prm.addr)
found := false found := false

View file

@ -21,6 +21,9 @@ func (b *Blobovnicza) Exists(ctx context.Context, addr oid.Address) (bool, error
)) ))
defer span.End() defer span.End()
b.controlMtx.RLock()
defer b.controlMtx.RUnlock()
addrKey := addressKey(addr) addrKey := addressKey(addr)
err := b.boltDB.View(func(tx *bbolt.Tx) error { err := b.boltDB.View(func(tx *bbolt.Tx) error {

View file

@ -51,6 +51,9 @@ func (b *Blobovnicza) Get(ctx context.Context, prm GetPrm) (GetRes, error) {
)) ))
defer span.End() defer span.End()
b.controlMtx.RLock()
defer b.controlMtx.RUnlock()
var ( var (
data []byte data []byte
addrKey = addressKey(prm.addr) addrKey = addressKey(prm.addr)

View file

@ -128,6 +128,9 @@ func (b *Blobovnicza) Iterate(ctx context.Context, prm IteratePrm) (IterateRes,
)) ))
defer span.End() defer span.End()
b.controlMtx.RLock()
defer b.controlMtx.RUnlock()
var elem IterationElement var elem IterationElement
if err := b.boltDB.View(func(tx *bbolt.Tx) error { if err := b.boltDB.View(func(tx *bbolt.Tx) error {

View file

@ -29,6 +29,9 @@ func (b *Blobovnicza) PutMoveInfo(ctx context.Context, prm MoveInfo) error {
)) ))
defer span.End() defer span.End()
b.controlMtx.RLock()
defer b.controlMtx.RUnlock()
key := addressKey(prm.Address) key := addressKey(prm.Address)
err := b.boltDB.Update(func(tx *bbolt.Tx) error { err := b.boltDB.Update(func(tx *bbolt.Tx) error {

View file

@ -64,6 +64,9 @@ func (b *Blobovnicza) Put(ctx context.Context, prm PutPrm) (PutRes, error) {
)) ))
defer span.End() defer span.End()
b.controlMtx.RLock()
defer b.controlMtx.RUnlock()
sz := uint64(len(prm.objData)) sz := uint64(len(prm.objData))
bucketName := bucketForSize(sz) bucketName := bucketForSize(sz)
key := addressKey(prm.addr) key := addressKey(prm.addr)

View file

@ -100,7 +100,7 @@ func (e *StorageEngine) delete(ctx context.Context, prm DeletePrm) (DeleteRes, e
return false return false
} else { } else {
if !client.IsErrObjectNotFound(err) { if !client.IsErrObjectNotFound(err) {
e.reportShardError(sh, "could not check object existence", err) e.reportShardError(sh, "could not check object existence", err, zap.Stringer("address", prm.addr))
} }
return false return false
} }
@ -116,7 +116,7 @@ func (e *StorageEngine) delete(ctx context.Context, prm DeletePrm) (DeleteRes, e
_, err = sh.Inhume(ctx, shPrm) _, err = sh.Inhume(ctx, shPrm)
if err != nil { if err != nil {
e.reportShardError(sh, "could not inhume object in shard", err) e.reportShardError(sh, "could not inhume object in shard", err, zap.Stringer("address", prm.addr))
var target *apistatus.ObjectLocked var target *apistatus.ObjectLocked
locked.is = errors.As(err, &target) locked.is = errors.As(err, &target)
@ -191,7 +191,7 @@ func (e *StorageEngine) deleteChunks(
var objID oid.ID var objID oid.ID
err := objID.ReadFromV2(chunk.ID) err := objID.ReadFromV2(chunk.ID)
if err != nil { if err != nil {
e.reportShardError(sh, "could not delete EC chunk", err) e.reportShardError(sh, "could not delete EC chunk", err, zap.Stringer("address", prm.addr))
} }
addr.SetObject(objID) addr.SetObject(objID)
inhumePrm.MarkAsGarbage(addr) inhumePrm.MarkAsGarbage(addr)

View file

@ -8,6 +8,7 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status" apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
"go.uber.org/zap"
) )
// exists return in the first value true if object exists. // exists return in the first value true if object exists.
@ -36,7 +37,7 @@ func (e *StorageEngine) exists(ctx context.Context, shPrm shard.ExistsPrm) (bool
} }
if !client.IsErrObjectNotFound(err) { if !client.IsErrObjectNotFound(err) {
e.reportShardError(sh, "could not check existence of object in shard", err) e.reportShardError(sh, "could not check existence of object in shard", err, zap.Stringer("address", shPrm.Address))
} }
return false return false
} }

View file

@ -186,7 +186,7 @@ func (i *getShardIterator) tryGetWithMeta(ctx context.Context) {
i.ObjectExpired = true i.ObjectExpired = true
return true return true
default: default:
i.Engine.reportShardError(sh, "could not get object from shard", err) i.Engine.reportShardError(sh, "could not get object from shard", err, zap.Stringer("address", i.Address))
return false return false
} }
}) })

View file

@ -12,6 +12,7 @@ import (
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status" apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"go.uber.org/zap"
) )
// HeadPrm groups the parameters of Head operation. // HeadPrm groups the parameters of Head operation.
@ -118,7 +119,7 @@ func (e *StorageEngine) head(ctx context.Context, prm HeadPrm) (HeadRes, error)
outError = new(apistatus.ObjectNotFound) outError = new(apistatus.ObjectNotFound)
return true return true
default: default:
e.reportShardError(sh, "could not head object from shard", err) e.reportShardError(sh, "could not head object from shard", err, zap.Stringer("address", prm.addr))
return false return false
} }
} }
@ -126,17 +127,16 @@ func (e *StorageEngine) head(ctx context.Context, prm HeadPrm) (HeadRes, error)
return true return true
}) })
if head != nil {
return HeadRes{head: head}, nil
}
if outSI != nil { if outSI != nil {
return HeadRes{}, logicerr.Wrap(objectSDK.NewSplitInfoError(outSI)) return HeadRes{}, logicerr.Wrap(objectSDK.NewSplitInfoError(outSI))
} else if outEI != nil {
return HeadRes{}, logicerr.Wrap(objectSDK.NewECInfoError(outEI))
} else if head == nil {
return HeadRes{}, outError
} }
if outEI != nil {
return HeadRes{ return HeadRes{}, logicerr.Wrap(objectSDK.NewECInfoError(outEI))
head: head, }
}, nil return HeadRes{}, outError
} }
// Head reads object header from local storage by provided address. // Head reads object header from local storage by provided address.

View file

@ -154,7 +154,7 @@ func (e *StorageEngine) inhumeAddr(ctx context.Context, addr oid.Address, prm sh
var siErr *objectSDK.SplitInfoError var siErr *objectSDK.SplitInfoError
var ecErr *objectSDK.ECInfoError var ecErr *objectSDK.ECInfoError
if !(errors.As(err, &siErr) || errors.As(err, &ecErr)) { if !(errors.As(err, &siErr) || errors.As(err, &ecErr)) {
e.reportShardError(sh, "could not check for presents in shard", err) e.reportShardError(sh, "could not check for presents in shard", err, zap.Stringer("address", addr))
return return
} }
@ -179,7 +179,7 @@ func (e *StorageEngine) inhumeAddr(ctx context.Context, addr oid.Address, prm sh
return true return true
} }
e.reportShardError(sh, "could not inhume object in shard", err) e.reportShardError(sh, "could not inhume object in shard", err, zap.Stringer("address", addr))
return false return false
} }
@ -205,7 +205,7 @@ func (e *StorageEngine) IsLocked(ctx context.Context, addr oid.Address) (bool, e
e.iterateOverUnsortedShards(func(h hashedShard) (stop bool) { e.iterateOverUnsortedShards(func(h hashedShard) (stop bool) {
locked, err = h.Shard.IsLocked(ctx, addr) locked, err = h.Shard.IsLocked(ctx, addr)
if err != nil { if err != nil {
e.reportShardError(h, "can't check object's lockers", err, zap.Stringer("addr", addr), e.reportShardError(h, "can't check object's lockers", err, zap.Stringer("address", addr),
zap.String("trace_id", tracingPkg.GetTraceID(ctx))) zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
outErr = err outErr = err
return false return false
@ -235,7 +235,7 @@ func (e *StorageEngine) GetLocked(ctx context.Context, addr oid.Address) ([]oid.
e.iterateOverUnsortedShards(func(h hashedShard) (stop bool) { e.iterateOverUnsortedShards(func(h hashedShard) (stop bool) {
ld, err := h.Shard.GetLocked(ctx, addr) ld, err := h.Shard.GetLocked(ctx, addr)
if err != nil { if err != nil {
e.reportShardError(h, logs.EngineInterruptGettingLockers, err, zap.Stringer("addr", addr), e.reportShardError(h, logs.EngineInterruptGettingLockers, err, zap.Stringer("address", addr),
zap.String("trace_id", tracingPkg.GetTraceID(ctx))) zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
outErr = err outErr = err
} }

View file

@ -13,6 +13,7 @@ import (
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace" "go.opentelemetry.io/otel/trace"
"go.uber.org/zap"
) )
var errLockFailed = errors.New("lock operation failed") var errLockFailed = errors.New("lock operation failed")
@ -62,11 +63,9 @@ func (e *StorageEngine) lock(ctx context.Context, idCnr cid.ID, locker oid.ID, l
func (e *StorageEngine) lockSingle(ctx context.Context, idCnr cid.ID, locker, locked oid.ID, checkExists bool) (status uint8) { func (e *StorageEngine) lockSingle(ctx context.Context, idCnr cid.ID, locker, locked oid.ID, checkExists bool) (status uint8) {
// code is pretty similar to inhumeAddr, maybe unify? // code is pretty similar to inhumeAddr, maybe unify?
root := false root := false
var addrLocked oid.Address var addrLocked oid.Address
addrLocked.SetContainer(idCnr) addrLocked.SetContainer(idCnr)
addrLocked.SetObject(locked) addrLocked.SetObject(locked)
e.iterateOverSortedShards(addrLocked, func(_ int, sh hashedShard) (stop bool) { e.iterateOverSortedShards(addrLocked, func(_ int, sh hashedShard) (stop bool) {
defer func() { defer func() {
// if object is root we continue since information about it // if object is root we continue since information about it
@ -79,7 +78,6 @@ func (e *StorageEngine) lockSingle(ctx context.Context, idCnr cid.ID, locker, lo
if checkExists { if checkExists {
var existsPrm shard.ExistsPrm var existsPrm shard.ExistsPrm
existsPrm.Address = addrLocked existsPrm.Address = addrLocked
exRes, err := sh.Exists(ctx, existsPrm) exRes, err := sh.Exists(ctx, existsPrm)
if err != nil { if err != nil {
var siErr *objectSDK.SplitInfoError var siErr *objectSDK.SplitInfoError
@ -90,14 +88,16 @@ func (e *StorageEngine) lockSingle(ctx context.Context, idCnr cid.ID, locker, lo
var objID oid.ID var objID oid.ID
err = objID.ReadFromV2(chunk.ID) err = objID.ReadFromV2(chunk.ID)
if err != nil { if err != nil {
e.reportShardError(sh, "could not lock object in shard", err) e.reportShardError(sh, "could not lock object in shard", err, zap.Stringer("container_id", idCnr),
zap.Stringer("locker_id", locker), zap.Stringer("locked_id", locked))
return false return false
} }
eclocked = append(eclocked, objID) eclocked = append(eclocked, objID)
} }
err = sh.Lock(ctx, idCnr, locker, eclocked) err = sh.Lock(ctx, idCnr, locker, eclocked)
if err != nil { if err != nil {
e.reportShardError(sh, "could not lock object in shard", err) e.reportShardError(sh, "could not lock object in shard", err, zap.Stringer("container_id", idCnr),
zap.Stringer("locker_id", locker), zap.Stringer("locked_id", locked))
return false return false
} }
root = true root = true
@ -108,8 +108,8 @@ func (e *StorageEngine) lockSingle(ctx context.Context, idCnr cid.ID, locker, lo
// do not lock it // do not lock it
return true return true
} }
e.reportShardError(sh, "could not check locked object for presence in shard", err, zap.Stringer("container_id", idCnr),
e.reportShardError(sh, "could not check locked object for presence in shard", err) zap.Stringer("locker_id", locker), zap.Stringer("locked_id", locked))
return return
} }
@ -121,21 +121,18 @@ func (e *StorageEngine) lockSingle(ctx context.Context, idCnr cid.ID, locker, lo
err := sh.Lock(ctx, idCnr, locker, []oid.ID{locked}) err := sh.Lock(ctx, idCnr, locker, []oid.ID{locked})
if err != nil { if err != nil {
e.reportShardError(sh, "could not lock object in shard", err) e.reportShardError(sh, "could not lock object in shard", err, zap.Stringer("container_id", idCnr),
zap.Stringer("locker_id", locker), zap.Stringer("locked_id", locked))
var errIrregular *apistatus.LockNonRegularObject var errIrregular *apistatus.LockNonRegularObject
if errors.As(err, &errIrregular) { if errors.As(err, &errIrregular) {
status = 1 status = 1
return true return true
} }
return false return false
} }
status = 2 status = 2
return true return true
}) })
return return
} }

View file

@ -187,7 +187,7 @@ func (e *StorageEngine) putToShard(ctx context.Context, sh hashedShard, pool uti
return return
} }
e.reportShardError(sh, "could not put object to shard", err) e.reportShardError(sh, "could not put object to shard", err, zap.Stringer("address", addr))
return return
} }

View file

@ -208,7 +208,7 @@ func (i *getRangeShardIterator) tryGetWithMeta(ctx context.Context) {
return true // stop, return it back return true // stop, return it back
default: default:
i.Engine.reportShardError(sh, "could not get object from shard", err) i.Engine.reportShardError(sh, "could not get object from shard", err, zap.Stringer("address", i.Address))
return false return false
} }
}) })

View file

@ -160,11 +160,23 @@ func getVirtualObject(tx *bbolt.Tx, cnr cid.ID, key []byte, raw bool) (*objectSD
return nil, logicerr.Wrap(new(apistatus.ObjectNotFound)) return nil, logicerr.Wrap(new(apistatus.ObjectNotFound))
} }
// pick last item, for now there is not difference which address to pick var data []byte
// but later list might be sorted so first or last value can be more for i := 0; i < len(relativeLst) && len(data) == 0; i++ {
// prioritized to choose virtualOID := relativeLst[len(relativeLst)-i-1]
virtualOID := relativeLst[len(relativeLst)-1] data = getFromBucket(tx, primaryBucketName(cnr, bucketName), virtualOID)
data := getFromBucket(tx, primaryBucketName(cnr, bucketName), virtualOID) }
if len(data) == 0 {
// check if any of the relatives is an EC object
for _, relative := range relativeLst {
data = getFromBucket(tx, ecInfoBucketName(cnr, bucketName), relative)
if len(data) > 0 {
// we can't return object headers, but can return error,
// so assembler can try to assemble complex object
return nil, getSplitInfoError(tx, cnr, key)
}
}
}
child := objectSDK.New() child := objectSDK.New()

View file

@ -2,6 +2,7 @@ package benchmark
import ( import (
"context" "context"
"fmt"
"testing" "testing"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
@ -27,6 +28,24 @@ func BenchmarkWritecachePar(b *testing.B) {
}) })
} }
func BenchmarkWriteAfterDelete(b *testing.B) {
const payloadSize = 32 << 10
const parallel = 25
cache := newCache(b)
benchmarkPutPrepare(b, cache)
b.Run(fmt.Sprintf("%dB_before", payloadSize), func(b *testing.B) {
b.SetParallelism(parallel)
benchmarkRunPar(b, cache, payloadSize)
})
require.NoError(b, cache.Flush(context.Background(), false, false))
b.Run(fmt.Sprintf("%dB_after", payloadSize), func(b *testing.B) {
b.SetParallelism(parallel)
benchmarkRunPar(b, cache, payloadSize)
})
require.NoError(b, cache.Close())
}
func benchmarkPutSeq(b *testing.B, cache writecache.Cache, size uint64) { func benchmarkPutSeq(b *testing.B, cache writecache.Cache, size uint64) {
benchmarkPutPrepare(b, cache) benchmarkPutPrepare(b, cache)
defer func() { require.NoError(b, cache.Close()) }() defer func() { require.NoError(b, cache.Close()) }()
@ -54,6 +73,10 @@ func benchmarkPutPar(b *testing.B, cache writecache.Cache, size uint64) {
benchmarkPutPrepare(b, cache) benchmarkPutPrepare(b, cache)
defer func() { require.NoError(b, cache.Close()) }() defer func() { require.NoError(b, cache.Close()) }()
benchmarkRunPar(b, cache, size)
}
func benchmarkRunPar(b *testing.B, cache writecache.Cache, size uint64) {
ctx := context.Background() ctx := context.Background()
b.ResetTimer() b.ResetTimer()

View file

@ -45,6 +45,8 @@ type options struct {
metrics Metrics metrics Metrics
// disableBackgroundFlush is for testing purposes only. // disableBackgroundFlush is for testing purposes only.
disableBackgroundFlush bool disableBackgroundFlush bool
// pageSize is bbolt's page size config value
pageSize int
} }
// WithLogger sets logger. // WithLogger sets logger.
@ -163,3 +165,10 @@ func WithDisableBackgroundFlush() Option {
o.disableBackgroundFlush = true o.disableBackgroundFlush = true
} }
} }
// WithPageSize sets bbolt's page size.
func WithPageSize(s int) Option {
return func(o *options) {
o.pageSize = s
}
}

View file

@ -32,7 +32,7 @@ func (c *cache) openStore(mod mode.ComponentMode) error {
return err return err
} }
c.db, err = OpenDB(c.path, mod.ReadOnly(), c.openFile) c.db, err = OpenDB(c.path, mod.ReadOnly(), c.openFile, c.pageSize)
if err != nil { if err != nil {
return fmt.Errorf("could not open database: %w", err) return fmt.Errorf("could not open database: %w", err)
} }

View file

@ -10,11 +10,12 @@ import (
) )
// OpenDB opens BoltDB instance for write-cache. Opens in read-only mode if ro is true. // OpenDB opens BoltDB instance for write-cache. Opens in read-only mode if ro is true.
func OpenDB(p string, ro bool, openFile func(string, int, fs.FileMode) (*os.File, error)) (*bbolt.DB, error) { func OpenDB(p string, ro bool, openFile func(string, int, fs.FileMode) (*os.File, error), pageSize int) (*bbolt.DB, error) {
return bbolt.Open(filepath.Join(p, dbName), os.ModePerm, &bbolt.Options{ return bbolt.Open(filepath.Join(p, dbName), os.ModePerm, &bbolt.Options{
NoFreelistSync: true, NoFreelistSync: true,
ReadOnly: ro, ReadOnly: ro,
Timeout: 100 * time.Millisecond, Timeout: 100 * time.Millisecond,
OpenFile: openFile, OpenFile: openFile,
PageSize: pageSize,
}) })
} }

View file

@ -148,6 +148,10 @@ func New(ctx context.Context, key *keys.PrivateKey, opts ...Option) (*Client, er
} else { } else {
cli.logger.Info(logs.FrostFSIRCreatedRPCClientForEndpoint, cli.logger.Info(logs.FrostFSIRCreatedRPCClientForEndpoint,
zap.String("endpoint", endpoint.Address)) zap.String("endpoint", endpoint.Address))
if cli.endpoints.curr > 0 && cli.cfg.switchInterval != 0 {
cli.switchIsActive.Store(true)
go cli.switchToMostPrioritized(ctx)
}
break break
} }
} }

View file

@ -1,53 +1,13 @@
package container package container
import ( import (
"crypto/sha256"
"fmt" "fmt"
v2refs "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/refs" v2refs "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/refs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
) )
// AnnounceLoadPrm groups parameters of AnnounceLoad operation.
type AnnounceLoadPrm struct {
a container.SizeEstimation
key []byte
client.InvokePrmOptional
}
// SetAnnouncement sets announcement.
func (a2 *AnnounceLoadPrm) SetAnnouncement(a container.SizeEstimation) {
a2.a = a
}
// SetReporter sets public key of the reporter.
func (a2 *AnnounceLoadPrm) SetReporter(key []byte) {
a2.key = key
}
// AnnounceLoad saves container size estimation calculated by storage node
// with key in FrostFS system through Container contract call.
//
// Returns any error encountered that caused the saving to interrupt.
func (c *Client) AnnounceLoad(p AnnounceLoadPrm) error {
binCnr := make([]byte, sha256.Size)
p.a.Container().Encode(binCnr)
prm := client.InvokePrm{}
prm.SetMethod(putSizeMethod)
prm.SetArgs(p.a.Epoch(), binCnr, p.a.Value(), p.key)
prm.InvokePrmOptional = p.InvokePrmOptional
_, err := c.client.Invoke(prm)
if err != nil {
return fmt.Errorf("could not invoke method (%s): %w", putSizeMethod, err)
}
return nil
}
// EstimationID is an identity of container load estimation inside Container contract. // EstimationID is an identity of container load estimation inside Container contract.
type EstimationID []byte type EstimationID []byte

View file

@ -239,15 +239,6 @@ func (x *multiClient) ObjectPutSingle(ctx context.Context, p client.PrmObjectPut
return return
} }
func (x *multiClient) ContainerAnnounceUsedSpace(ctx context.Context, prm client.PrmAnnounceSpace) (res *client.ResAnnounceSpace, err error) {
err = x.iterateClients(ctx, func(c clientcore.Client) error {
res, err = c.ContainerAnnounceUsedSpace(ctx, prm)
return err
})
return
}
func (x *multiClient) ObjectDelete(ctx context.Context, p client.PrmObjectDelete) (res *client.ResObjectDelete, err error) { func (x *multiClient) ObjectDelete(ctx context.Context, p client.PrmObjectDelete) (res *client.ResObjectDelete, err error) {
err = x.iterateClients(ctx, func(c clientcore.Client) error { err = x.iterateClients(ctx, func(c clientcore.Client) error {
res, err = c.ObjectDelete(ctx, p) res, err = c.ObjectDelete(ctx, p)

View file

@ -81,21 +81,6 @@ func (s *Server) List(ctx context.Context, req *containerGRPC.ListRequest) (*con
return resp.ToGRPCMessage().(*containerGRPC.ListResponse), nil return resp.ToGRPCMessage().(*containerGRPC.ListResponse), nil
} }
// SetExtendedACL converts gRPC SetExtendedACLRequest message and passes it to internal Container service.
func (s *Server) SetExtendedACL(ctx context.Context, req *containerGRPC.SetExtendedACLRequest) (*containerGRPC.SetExtendedACLResponse, error) {
setEACLReq := new(container.SetExtendedACLRequest)
if err := setEACLReq.FromGRPCMessage(req); err != nil {
return nil, err
}
resp, err := s.srv.SetExtendedACL(ctx, setEACLReq)
if err != nil {
return nil, err
}
return resp.ToGRPCMessage().(*containerGRPC.SetExtendedACLResponse), nil
}
// GetExtendedACL converts gRPC GetExtendedACLRequest message and passes it to internal Container service. // GetExtendedACL converts gRPC GetExtendedACLRequest message and passes it to internal Container service.
func (s *Server) GetExtendedACL(ctx context.Context, req *containerGRPC.GetExtendedACLRequest) (*containerGRPC.GetExtendedACLResponse, error) { func (s *Server) GetExtendedACL(ctx context.Context, req *containerGRPC.GetExtendedACLRequest) (*containerGRPC.GetExtendedACLResponse, error) {
getEACLReq := new(container.GetExtendedACLRequest) getEACLReq := new(container.GetExtendedACLRequest)
@ -110,18 +95,3 @@ func (s *Server) GetExtendedACL(ctx context.Context, req *containerGRPC.GetExten
return resp.ToGRPCMessage().(*containerGRPC.GetExtendedACLResponse), nil return resp.ToGRPCMessage().(*containerGRPC.GetExtendedACLResponse), nil
} }
// AnnounceUsedSpace converts gRPC AnnounceUsedSpaceRequest message and passes it to internal Container service.
func (s *Server) AnnounceUsedSpace(ctx context.Context, req *containerGRPC.AnnounceUsedSpaceRequest) (*containerGRPC.AnnounceUsedSpaceResponse, error) {
announceReq := new(container.AnnounceUsedSpaceRequest)
if err := announceReq.FromGRPCMessage(req); err != nil {
return nil, err
}
resp, err := s.srv.AnnounceUsedSpace(ctx, announceReq)
if err != nil {
return nil, err
}
return resp.ToGRPCMessage().(*containerGRPC.AnnounceUsedSpaceResponse), nil
}

View file

@ -78,15 +78,6 @@ func NewAPEServer(router policyengine.ChainRouter, reader containers, ir ir, nm
} }
} }
func (ac *apeChecker) AnnounceUsedSpace(ctx context.Context, req *container.AnnounceUsedSpaceRequest) (*container.AnnounceUsedSpaceResponse, error) {
ctx, span := tracing.StartSpanFromContext(ctx, "apeChecker.AnnounceUsedSpace")
defer span.End()
// this method is not used, so not checked
return ac.next.AnnounceUsedSpace(ctx, req)
}
func (ac *apeChecker) Delete(ctx context.Context, req *container.DeleteRequest) (*container.DeleteResponse, error) { func (ac *apeChecker) Delete(ctx context.Context, req *container.DeleteRequest) (*container.DeleteResponse, error) {
ctx, span := tracing.StartSpanFromContext(ctx, "apeChecker.Delete") ctx, span := tracing.StartSpanFromContext(ctx, "apeChecker.Delete")
defer span.End() defer span.End()
@ -220,7 +211,7 @@ func (ac *apeChecker) Put(ctx context.Context, req *container.PutRequest) (*cont
} }
} }
namespace, err := ac.namespaceByOwner(req.GetBody().GetContainer().GetOwnerID()) namespace, err := ac.namespaceByKnownOwner(req.GetBody().GetContainer().GetOwnerID())
if err != nil { if err != nil {
return nil, fmt.Errorf("get namespace error: %w", err) return nil, fmt.Errorf("get namespace error: %w", err)
} }
@ -303,18 +294,6 @@ func (ac *apeChecker) getRoleWithoutContainerID(oID *refs.OwnerID, mh *session.R
return nativeschema.PropertyValueContainerRoleOthers, pk, nil return nativeschema.PropertyValueContainerRoleOthers, pk, nil
} }
func (ac *apeChecker) SetExtendedACL(ctx context.Context, req *container.SetExtendedACLRequest) (*container.SetExtendedACLResponse, error) {
ctx, span := tracing.StartSpanFromContext(ctx, "apeChecker.SetExtendedACL")
defer span.End()
if err := ac.validateContainerBoundedOperation(ctx, req.GetBody().GetEACL().GetContainerID(), req.GetMetaHeader(), req.GetVerificationHeader(),
nativeschema.MethodSetContainerEACL); err != nil {
return nil, err
}
return ac.next.SetExtendedACL(ctx, req)
}
func (ac *apeChecker) validateContainerBoundedOperation(ctx context.Context, containerID *refs.ContainerID, mh *session.RequestMetaHeader, vh *session.RequestVerificationHeader, op string) error { func (ac *apeChecker) validateContainerBoundedOperation(ctx context.Context, containerID *refs.ContainerID, mh *session.RequestMetaHeader, vh *session.RequestVerificationHeader, op string) error {
if vh == nil { if vh == nil {
return errMissingVerificationHeader return errMissingVerificationHeader
@ -629,6 +608,25 @@ func (ac *apeChecker) namespaceByOwner(owner *refs.OwnerID) (string, error) {
return namespace, nil return namespace, nil
} }
func (ac *apeChecker) namespaceByKnownOwner(owner *refs.OwnerID) (string, error) {
var ownerSDK user.ID
if owner == nil {
return "", errOwnerIDIsNotSet
}
if err := ownerSDK.ReadFromV2(*owner); err != nil {
return "", err
}
addr, err := ownerSDK.ScriptHash()
if err != nil {
return "", err
}
subject, err := ac.frostFSIDClient.GetSubject(addr)
if err != nil {
return "", fmt.Errorf("get subject error: %w", err)
}
return subject.Namespace, nil
}
// validateNamespace validates a namespace set in a container. // validateNamespace validates a namespace set in a container.
// If frostfs-id contract stores a namespace N1 for an owner ID and a container within a request // If frostfs-id contract stores a namespace N1 for an owner ID and a container within a request
// is set with namespace N2 (via Zone() property), then N2 is invalid and the request is denied. // is set with namespace N2 (via Zone() property), then N2 is invalid and the request is denied.

View file

@ -9,7 +9,6 @@ import (
"net" "net"
"testing" "testing"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/acl"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/container" "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/container"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/refs" "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/refs"
session "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/session" session "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/session"
@ -50,7 +49,6 @@ func TestAPE(t *testing.T) {
t.Run("deny get container by user claim tag", testDenyGetContainerByUserClaimTag) t.Run("deny get container by user claim tag", testDenyGetContainerByUserClaimTag)
t.Run("deny get container by IP", testDenyGetContainerByIP) t.Run("deny get container by IP", testDenyGetContainerByIP)
t.Run("deny get container by group id", testDenyGetContainerByGroupID) t.Run("deny get container by group id", testDenyGetContainerByGroupID)
t.Run("deny set container eACL for IR", testDenySetContainerEACLForIR)
t.Run("deny get container eACL for IR with session token", testDenyGetContainerEACLForIRSessionToken) t.Run("deny get container eACL for IR with session token", testDenyGetContainerEACLForIRSessionToken)
t.Run("deny put container for others with session token", testDenyPutContainerForOthersSessionToken) t.Run("deny put container for others with session token", testDenyPutContainerForOthersSessionToken)
t.Run("deny put container, read namespace from frostfsID", testDenyPutContainerReadNamespaceFromFrostfsID) t.Run("deny put container, read namespace from frostfsID", testDenyPutContainerReadNamespaceFromFrostfsID)
@ -665,84 +663,6 @@ func testDenyGetContainerByGroupID(t *testing.T) {
require.ErrorAs(t, err, &errAccessDenied) require.ErrorAs(t, err, &errAccessDenied)
} }
func testDenySetContainerEACLForIR(t *testing.T) {
t.Parallel()
srv := &srvStub{
calls: map[string]int{},
}
router := inmemory.NewInMemory()
contRdr := &containerStub{
c: map[cid.ID]*containercore.Container{},
}
ir := &irStub{
keys: [][]byte{},
}
nm := &netmapStub{}
frostfsIDSubjectReader := &frostfsidStub{
subjects: map[util.Uint160]*client.Subject{},
}
apeSrv := NewAPEServer(router, contRdr, ir, nm, frostfsIDSubjectReader, srv)
contID := cidtest.ID()
testContainer := containertest.Container()
pp := netmap.PlacementPolicy{}
require.NoError(t, pp.DecodeString("REP 1"))
testContainer.SetPlacementPolicy(pp)
contRdr.c[contID] = &containercore.Container{Value: testContainer}
nm.currentEpoch = 100
nm.netmaps = map[uint64]*netmap.NetMap{}
var testNetmap netmap.NetMap
testNetmap.SetEpoch(nm.currentEpoch)
testNetmap.SetNodes([]netmap.NodeInfo{{}})
nm.netmaps[nm.currentEpoch] = &testNetmap
nm.netmaps[nm.currentEpoch-1] = &testNetmap
_, _, err := router.MorphRuleChainStorage().AddMorphRuleChain(chain.Ingress, engine.ContainerTarget(contID.EncodeToString()), &chain.Chain{
Rules: []chain.Rule{
{
Status: chain.AccessDenied,
Actions: chain.Actions{
Names: []string{
nativeschema.MethodSetContainerEACL,
},
},
Resources: chain.Resources{
Names: []string{
fmt.Sprintf(nativeschema.ResourceFormatRootContainer, contID.EncodeToString()),
},
},
Condition: []chain.Condition{
{
Kind: chain.KindRequest,
Key: nativeschema.PropertyKeyActorRole,
Value: nativeschema.PropertyValueContainerRoleIR,
Op: chain.CondStringEquals,
},
},
},
},
})
require.NoError(t, err)
req := &container.SetExtendedACLRequest{}
req.SetBody(&container.SetExtendedACLRequestBody{})
var refContID refs.ContainerID
contID.WriteToV2(&refContID)
req.GetBody().SetEACL(&acl.Table{})
req.GetBody().GetEACL().SetContainerID(&refContID)
pk, err := keys.NewPrivateKey()
require.NoError(t, err)
require.NoError(t, signature.SignServiceMessage(&pk.PrivateKey, req))
ir.keys = append(ir.keys, pk.PublicKey().Bytes())
resp, err := apeSrv.SetExtendedACL(context.Background(), req)
require.Nil(t, resp)
var errAccessDenied *apistatus.ObjectAccessDenied
require.ErrorAs(t, err, &errAccessDenied)
}
func testDenyGetContainerEACLForIRSessionToken(t *testing.T) { func testDenyGetContainerEACLForIRSessionToken(t *testing.T) {
t.Parallel() t.Parallel()
srv := &srvStub{ srv := &srvStub{
@ -845,17 +765,22 @@ func testDenyPutContainerForOthersSessionToken(t *testing.T) {
keys: [][]byte{}, keys: [][]byte{},
} }
nm := &netmapStub{} nm := &netmapStub{}
frostfsIDSubjectReader := &frostfsidStub{
subjects: map[util.Uint160]*client.Subject{},
}
apeSrv := NewAPEServer(router, contRdr, ir, nm, frostfsIDSubjectReader, srv)
testContainer := containertest.Container() testContainer := containertest.Container()
owner := testContainer.Owner()
ownerAddr, err := owner.ScriptHash()
require.NoError(t, err)
frostfsIDSubjectReader := &frostfsidStub{
subjects: map[util.Uint160]*client.Subject{
ownerAddr: {},
},
}
apeSrv := NewAPEServer(router, contRdr, ir, nm, frostfsIDSubjectReader, srv)
nm.currentEpoch = 100 nm.currentEpoch = 100
nm.netmaps = map[uint64]*netmap.NetMap{} nm.netmaps = map[uint64]*netmap.NetMap{}
_, _, err := router.MorphRuleChainStorage().AddMorphRuleChain(chain.Ingress, engine.NamespaceTarget(""), &chain.Chain{ _, _, err = router.MorphRuleChainStorage().AddMorphRuleChain(chain.Ingress, engine.NamespaceTarget(""), &chain.Chain{
Rules: []chain.Rule{ Rules: []chain.Rule{
{ {
Status: chain.AccessDenied, Status: chain.AccessDenied,
@ -1229,11 +1154,6 @@ type srvStub struct {
calls map[string]int calls map[string]int
} }
func (s *srvStub) AnnounceUsedSpace(context.Context, *container.AnnounceUsedSpaceRequest) (*container.AnnounceUsedSpaceResponse, error) {
s.calls["AnnounceUsedSpace"]++
return &container.AnnounceUsedSpaceResponse{}, nil
}
func (s *srvStub) Delete(context.Context, *container.DeleteRequest) (*container.DeleteResponse, error) { func (s *srvStub) Delete(context.Context, *container.DeleteRequest) (*container.DeleteResponse, error) {
s.calls["Delete"]++ s.calls["Delete"]++
return &container.DeleteResponse{}, nil return &container.DeleteResponse{}, nil
@ -1259,11 +1179,6 @@ func (s *srvStub) Put(context.Context, *container.PutRequest) (*container.PutRes
return &container.PutResponse{}, nil return &container.PutResponse{}, nil
} }
func (s *srvStub) SetExtendedACL(context.Context, *container.SetExtendedACLRequest) (*container.SetExtendedACLResponse, error) {
s.calls["SetExtendedACL"]++
return &container.SetExtendedACLResponse{}, nil
}
type irStub struct { type irStub struct {
keys [][]byte keys [][]byte
} }

View file

@ -6,7 +6,6 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/container" "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/container"
container_grpc "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/container/grpc" container_grpc "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/container/grpc"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/refs"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/audit" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/audit"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
@ -29,24 +28,6 @@ func NewAuditService(next Server, log *logger.Logger, enabled *atomic.Bool) Serv
} }
} }
// AnnounceUsedSpace implements Server.
func (a *auditService) AnnounceUsedSpace(ctx context.Context, req *container.AnnounceUsedSpaceRequest) (*container.AnnounceUsedSpaceResponse, error) {
res, err := a.next.AnnounceUsedSpace(ctx, req)
if !a.enabled.Load() {
return res, err
}
var ids []*refs.ContainerID
for _, v := range req.GetBody().GetAnnouncements() {
ids = append(ids, v.GetContainerID())
}
audit.LogRequest(a.log, container_grpc.ContainerService_AnnounceUsedSpace_FullMethodName, req,
audit.TargetFromRefs(ids, &cid.ID{}), err == nil)
return res, err
}
// Delete implements Server. // Delete implements Server.
func (a *auditService) Delete(ctx context.Context, req *container.DeleteRequest) (*container.DeleteResponse, error) { func (a *auditService) Delete(ctx context.Context, req *container.DeleteRequest) (*container.DeleteResponse, error) {
res, err := a.next.Delete(ctx, req) res, err := a.next.Delete(ctx, req)
@ -103,14 +84,3 @@ func (a *auditService) Put(ctx context.Context, req *container.PutRequest) (*con
audit.TargetFromRef(res.GetBody().GetContainerID(), &cid.ID{}), err == nil) audit.TargetFromRef(res.GetBody().GetContainerID(), &cid.ID{}), err == nil)
return res, err return res, err
} }
// SetExtendedACL implements Server.
func (a *auditService) SetExtendedACL(ctx context.Context, req *container.SetExtendedACLRequest) (*container.SetExtendedACLResponse, error) {
res, err := a.next.SetExtendedACL(ctx, req)
if !a.enabled.Load() {
return res, err
}
audit.LogRequest(a.log, container_grpc.ContainerService_SetExtendedACL_FullMethodName, req,
audit.TargetFromRef(req.GetBody().GetEACL().GetContainerID(), &cid.ID{}), err == nil)
return res, err
}

View file

@ -14,7 +14,6 @@ type ServiceExecutor interface {
Delete(context.Context, *session.Token, *container.DeleteRequestBody) (*container.DeleteResponseBody, error) Delete(context.Context, *session.Token, *container.DeleteRequestBody) (*container.DeleteResponseBody, error)
Get(context.Context, *container.GetRequestBody) (*container.GetResponseBody, error) Get(context.Context, *container.GetRequestBody) (*container.GetResponseBody, error)
List(context.Context, *container.ListRequestBody) (*container.ListResponseBody, error) List(context.Context, *container.ListRequestBody) (*container.ListResponseBody, error)
SetExtendedACL(context.Context, *session.Token, *container.SetExtendedACLRequestBody) (*container.SetExtendedACLResponseBody, error)
GetExtendedACL(context.Context, *container.GetExtendedACLRequestBody) (*container.GetExtendedACLResponseBody, error) GetExtendedACL(context.Context, *container.GetExtendedACLRequestBody) (*container.GetExtendedACLResponseBody, error)
} }
@ -96,24 +95,6 @@ func (s *executorSvc) List(ctx context.Context, req *container.ListRequest) (*co
return resp, nil return resp, nil
} }
func (s *executorSvc) SetExtendedACL(ctx context.Context, req *container.SetExtendedACLRequest) (*container.SetExtendedACLResponse, error) {
meta := req.GetMetaHeader()
for origin := meta.GetOrigin(); origin != nil; origin = meta.GetOrigin() {
meta = origin
}
respBody, err := s.exec.SetExtendedACL(ctx, meta.GetSessionToken(), req.GetBody())
if err != nil {
return nil, fmt.Errorf("could not execute SetEACL request: %w", err)
}
resp := new(container.SetExtendedACLResponse)
resp.SetBody(respBody)
s.respSvc.SetMeta(resp)
return resp, nil
}
func (s *executorSvc) GetExtendedACL(ctx context.Context, req *container.GetExtendedACLRequest) (*container.GetExtendedACLResponse, error) { func (s *executorSvc) GetExtendedACL(ctx context.Context, req *container.GetExtendedACLRequest) (*container.GetExtendedACLResponse, error) {
respBody, err := s.exec.GetExtendedACL(ctx, req.GetBody()) respBody, err := s.exec.GetExtendedACL(ctx, req.GetBody())
if err != nil { if err != nil {

View file

@ -13,8 +13,6 @@ import (
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/session" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/session"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
) )
var errMissingUserID = errors.New("missing user ID") var errMissingUserID = errors.New("missing user ID")
@ -204,10 +202,6 @@ func (s *morphExecutor) List(_ context.Context, body *container.ListRequestBody)
return res, nil return res, nil
} }
func (s *morphExecutor) SetExtendedACL(_ context.Context, _ *sessionV2.Token, _ *container.SetExtendedACLRequestBody) (*container.SetExtendedACLResponseBody, error) {
return nil, status.Errorf(codes.Unimplemented, "method SetExtendedACL not implemented")
}
func (s *morphExecutor) GetExtendedACL(_ context.Context, body *container.GetExtendedACLRequestBody) (*container.GetExtendedACLResponseBody, error) { func (s *morphExecutor) GetExtendedACL(_ context.Context, body *container.GetExtendedACLRequestBody) (*container.GetExtendedACLResponseBody, error) {
idV2 := body.GetContainerID() idV2 := body.GetContainerID()
if idV2 == nil { if idV2 == nil {

View file

@ -12,7 +12,5 @@ type Server interface {
Get(context.Context, *container.GetRequest) (*container.GetResponse, error) Get(context.Context, *container.GetRequest) (*container.GetResponse, error)
Delete(context.Context, *container.DeleteRequest) (*container.DeleteResponse, error) Delete(context.Context, *container.DeleteRequest) (*container.DeleteResponse, error)
List(context.Context, *container.ListRequest) (*container.ListResponse, error) List(context.Context, *container.ListRequest) (*container.ListResponse, error)
SetExtendedACL(context.Context, *container.SetExtendedACLRequest) (*container.SetExtendedACLResponse, error)
GetExtendedACL(context.Context, *container.GetExtendedACLRequest) (*container.GetExtendedACLResponse, error) GetExtendedACL(context.Context, *container.GetExtendedACLRequest) (*container.GetExtendedACLResponse, error)
AnnounceUsedSpace(context.Context, *container.AnnounceUsedSpaceRequest) (*container.AnnounceUsedSpaceResponse, error)
} }

View file

@ -57,15 +57,6 @@ func (s *signService) List(ctx context.Context, req *container.ListRequest) (*co
return resp, s.sigSvc.SignResponse(resp, err) return resp, s.sigSvc.SignResponse(resp, err)
} }
func (s *signService) SetExtendedACL(ctx context.Context, req *container.SetExtendedACLRequest) (*container.SetExtendedACLResponse, error) {
if err := s.sigSvc.VerifyRequest(req); err != nil {
resp := new(container.SetExtendedACLResponse)
return resp, s.sigSvc.SignResponse(resp, err)
}
resp, err := util.EnsureNonNilResponse(s.svc.SetExtendedACL(ctx, req))
return resp, s.sigSvc.SignResponse(resp, err)
}
func (s *signService) GetExtendedACL(ctx context.Context, req *container.GetExtendedACLRequest) (*container.GetExtendedACLResponse, error) { func (s *signService) GetExtendedACL(ctx context.Context, req *container.GetExtendedACLRequest) (*container.GetExtendedACLResponse, error) {
if err := s.sigSvc.VerifyRequest(req); err != nil { if err := s.sigSvc.VerifyRequest(req); err != nil {
resp := new(container.GetExtendedACLResponse) resp := new(container.GetExtendedACLResponse)
@ -74,12 +65,3 @@ func (s *signService) GetExtendedACL(ctx context.Context, req *container.GetExte
resp, err := util.EnsureNonNilResponse(s.svc.GetExtendedACL(ctx, req)) resp, err := util.EnsureNonNilResponse(s.svc.GetExtendedACL(ctx, req))
return resp, s.sigSvc.SignResponse(resp, err) return resp, s.sigSvc.SignResponse(resp, err)
} }
func (s *signService) AnnounceUsedSpace(ctx context.Context, req *container.AnnounceUsedSpaceRequest) (*container.AnnounceUsedSpaceResponse, error) {
if err := s.sigSvc.VerifyRequest(req); err != nil {
resp := new(container.AnnounceUsedSpaceResponse)
return resp, s.sigSvc.SignResponse(resp, err)
}
resp, err := util.EnsureNonNilResponse(s.svc.AnnounceUsedSpace(ctx, req))
return resp, s.sigSvc.SignResponse(resp, err)
}

Binary file not shown.

Binary file not shown.

View file

@ -26,4 +26,7 @@ enum HealthStatus {
// IR application is shutting down. // IR application is shutting down.
SHUTTING_DOWN = 3; SHUTTING_DOWN = 3;
// IR application is reconfiguring.
RECONFIGURING = 4;
} }

View file

@ -12,7 +12,7 @@ import (
) )
func (r *request) assemble(ctx context.Context) { func (r *request) assemble(ctx context.Context) {
if !r.canAssemble() { if !r.canAssembleComplexObject() {
r.log.Debug(logs.GetCanNotAssembleTheObject) r.log.Debug(logs.GetCanNotAssembleTheObject)
return return
} }
@ -38,7 +38,7 @@ func (r *request) assemble(ctx context.Context) {
r.log.Debug(logs.GetTryingToAssembleTheObject) r.log.Debug(logs.GetTryingToAssembleTheObject)
r.prm.common = r.prm.common.WithLocalOnly(false) r.prm.common = r.prm.common.WithLocalOnly(false)
assembler := newAssembler(r.address(), r.splitInfo(), r.ctxRange(), r) assembler := newAssembler(r.address(), r.splitInfo(), r.ctxRange(), r, r.headOnly())
r.log.Debug(logs.GetAssemblingSplittedObject, r.log.Debug(logs.GetAssemblingSplittedObject,
zap.Uint64("range_offset", r.ctxRange().GetOffset()), zap.Uint64("range_offset", r.ctxRange().GetOffset()),

View file

@ -11,7 +11,7 @@ import (
) )
func (r *request) assembleEC(ctx context.Context) { func (r *request) assembleEC(ctx context.Context) {
if r.isRaw() && r.isLocal() { if r.isRaw() {
r.log.Debug(logs.GetCanNotAssembleTheObject) r.log.Debug(logs.GetCanNotAssembleTheObject)
return return
} }
@ -43,7 +43,7 @@ func (r *request) assembleEC(ctx context.Context) {
} }
r.prm.common = r.prm.common.WithLocalOnly(false) r.prm.common = r.prm.common.WithLocalOnly(false)
assembler := newAssemblerEC(r.address(), r.infoEC, r.ctxRange(), r, r.localStorage, r.containerSource, r.log, r.headOnly(), r.isRaw(), r.traverserGenerator, r.curProcEpoch) assembler := newAssemblerEC(r.address(), r.infoEC, r.ctxRange(), r, r.localStorage, r.containerSource, r.log, r.headOnly(), r.traverserGenerator, r.curProcEpoch)
r.log.Debug(logs.GetAssemblingECObject, r.log.Debug(logs.GetAssemblingECObject,
zap.Uint64("range_offset", r.ctxRange().GetOffset()), zap.Uint64("range_offset", r.ctxRange().GetOffset()),

View file

@ -19,6 +19,7 @@ type assembler struct {
splitInfo *objectSDK.SplitInfo splitInfo *objectSDK.SplitInfo
rng *objectSDK.Range rng *objectSDK.Range
objGetter objectGetter objGetter objectGetter
head bool
currentOffset uint64 currentOffset uint64
@ -30,18 +31,23 @@ func newAssembler(
splitInfo *objectSDK.SplitInfo, splitInfo *objectSDK.SplitInfo,
rng *objectSDK.Range, rng *objectSDK.Range,
objGetter objectGetter, objGetter objectGetter,
head bool,
) *assembler { ) *assembler {
return &assembler{ return &assembler{
addr: addr, addr: addr,
rng: rng, rng: rng,
splitInfo: splitInfo, splitInfo: splitInfo,
objGetter: objGetter, objGetter: objGetter,
head: head,
} }
} }
// Assemble assembles splitted large object and writes it's content to ObjectWriter. // Assemble assembles splitted large object and writes it's content to ObjectWriter.
// It returns parent object. // It returns parent object.
func (a *assembler) Assemble(ctx context.Context, writer ObjectWriter) (*objectSDK.Object, error) { func (a *assembler) Assemble(ctx context.Context, writer ObjectWriter) (*objectSDK.Object, error) {
if a.head {
return a.assembleHeader(ctx, writer)
}
sourceObjectID, ok := a.getLastPartOrLinkObjectID() sourceObjectID, ok := a.getLastPartOrLinkObjectID()
if !ok { if !ok {
return nil, objectSDK.NewSplitInfoError(a.splitInfo) return nil, objectSDK.NewSplitInfoError(a.splitInfo)
@ -65,6 +71,43 @@ func (a *assembler) Assemble(ctx context.Context, writer ObjectWriter) (*objectS
return a.parentObject, nil return a.parentObject, nil
} }
func (a *assembler) assembleHeader(ctx context.Context, writer ObjectWriter) (*objectSDK.Object, error) {
var sourceObjectIDs []oid.ID
sourceObjectID, ok := a.splitInfo.Link()
if ok {
sourceObjectIDs = append(sourceObjectIDs, sourceObjectID)
}
sourceObjectID, ok = a.splitInfo.LastPart()
if ok {
sourceObjectIDs = append(sourceObjectIDs, sourceObjectID)
}
if len(sourceObjectIDs) == 0 {
return nil, objectSDK.NewSplitInfoError(a.splitInfo)
}
for _, sourceObjectID = range sourceObjectIDs {
obj, err := a.getParent(ctx, sourceObjectID, writer)
if err == nil {
return obj, nil
}
}
return nil, objectSDK.NewSplitInfoError(a.splitInfo)
}
func (a *assembler) getParent(ctx context.Context, sourceObjectID oid.ID, writer ObjectWriter) (*objectSDK.Object, error) {
obj, err := a.objGetter.HeadObject(ctx, sourceObjectID)
if err != nil {
return nil, err
}
parent := obj.Parent()
if parent == nil {
return nil, objectSDK.NewSplitInfoError(a.splitInfo)
}
if err := writer.WriteHeader(ctx, parent); err != nil {
return nil, err
}
return obj, nil
}
func (a *assembler) getLastPartOrLinkObjectID() (oid.ID, bool) { func (a *assembler) getLastPartOrLinkObjectID() (oid.ID, bool) {
sourceObjectID, ok := a.splitInfo.Link() sourceObjectID, ok := a.splitInfo.Link()
if ok { if ok {

View file

@ -37,7 +37,6 @@ type assemblerec struct {
cs container.Source cs container.Source
log *logger.Logger log *logger.Logger
head bool head bool
raw bool
traverserGenerator traverserGenerator traverserGenerator traverserGenerator
epoch uint64 epoch uint64
} }
@ -51,7 +50,6 @@ func newAssemblerEC(
cs container.Source, cs container.Source,
log *logger.Logger, log *logger.Logger,
head bool, head bool,
raw bool,
tg traverserGenerator, tg traverserGenerator,
epoch uint64, epoch uint64,
) *assemblerec { ) *assemblerec {
@ -64,7 +62,6 @@ func newAssemblerEC(
cs: cs, cs: cs,
log: log, log: log,
head: head, head: head,
raw: raw,
traverserGenerator: tg, traverserGenerator: tg,
epoch: epoch, epoch: epoch,
} }
@ -74,9 +71,6 @@ func newAssemblerEC(
// It returns parent object. // It returns parent object.
func (a *assemblerec) Assemble(ctx context.Context, writer ObjectWriter) (*objectSDK.Object, error) { func (a *assemblerec) Assemble(ctx context.Context, writer ObjectWriter) (*objectSDK.Object, error) {
switch { switch {
case a.raw:
err := a.reconstructRawError(ctx)
return nil, err
case a.head: case a.head:
return a.reconstructHeader(ctx, writer) return a.reconstructHeader(ctx, writer)
case a.rng != nil: case a.rng != nil:
@ -149,56 +143,6 @@ func (a *assemblerec) reconstructObjectFromParts(ctx context.Context, headers bo
return c.Reconstruct(parts) return c.Reconstruct(parts)
} }
func (a *assemblerec) reconstructRawError(ctx context.Context) error {
chunks := make(map[string]objectSDK.ECChunk)
var chunksGuard sync.Mutex
for _, ch := range a.ecInfo.localChunks {
chunks[string(ch.ID.GetValue())] = ch
}
objID := a.addr.Object()
trav, _, err := a.traverserGenerator.GenerateTraverser(a.addr.Container(), &objID, a.epoch)
if err != nil {
return err
}
eg, ctx := errgroup.WithContext(ctx)
for {
batch := trav.Next()
if len(batch) == 0 {
break
}
for _, node := range batch {
var info client.NodeInfo
client.NodeInfoFromNetmapElement(&info, node)
eg.Go(func() error {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
if _, found := a.ecInfo.remoteChunks[string(info.PublicKey())]; found {
return nil
}
nodeChunks := a.tryGetChunkListFromNode(ctx, info)
chunksGuard.Lock()
defer chunksGuard.Unlock()
for _, ch := range nodeChunks {
chunks[string(ch.ID.GetValue())] = ch
}
return nil
})
}
}
if err = eg.Wait(); err != nil {
return err
}
return createECInfoErr(chunks)
}
func (a *assemblerec) retrieveParts(ctx context.Context, trav *placement.Traverser, cnr *container.Container) []*objectSDK.Object { func (a *assemblerec) retrieveParts(ctx context.Context, trav *placement.Traverser, cnr *container.Container) []*objectSDK.Object {
dataCount := policy.ECDataCount(cnr.Value.PlacementPolicy()) dataCount := policy.ECDataCount(cnr.Value.PlacementPolicy())
parityCount := policy.ECParityCount(cnr.Value.PlacementPolicy()) parityCount := policy.ECParityCount(cnr.Value.PlacementPolicy())
@ -293,7 +237,7 @@ func (a *assemblerec) tryGetChunkFromLocalStorage(ctx context.Context, ch object
return nil return nil
} }
var addr oid.Address var addr oid.Address
addr.SetContainer(addr.Container()) addr.SetContainer(a.addr.Container())
addr.SetObject(objID) addr.SetObject(objID)
var object *objectSDK.Object var object *objectSDK.Object
if a.head { if a.head {
@ -359,11 +303,3 @@ func (a *assemblerec) tryGetChunkFromRemoteStorage(ctx context.Context, node cli
} }
return object return object
} }
func createECInfoErr(chunks map[string]objectSDK.ECChunk) *objectSDK.ECInfoError {
info := objectSDK.NewECInfo()
for _, ch := range chunks {
info.AddChunk(ch)
}
return objectSDK.NewECInfoError(info)
}

View file

@ -111,6 +111,10 @@ func (exec *request) analyzeStatus(ctx context.Context, execCnr bool) {
exec.log.Debug(logs.GetRequestedRangeIsOutOfObjectBounds) exec.log.Debug(logs.GetRequestedRangeIsOutOfObjectBounds)
case statusEC: case statusEC:
exec.log.Debug(logs.GetRequestedObjectIsEC) exec.log.Debug(logs.GetRequestedObjectIsEC)
if exec.isRaw() && execCnr {
exec.executeOnContainer(ctx)
exec.analyzeStatus(ctx, false)
}
exec.assembleEC(ctx) exec.assembleEC(ctx)
default: default:
exec.log.Debug(logs.OperationFinishedWithError, exec.log.Debug(logs.OperationFinishedWithError,

View file

@ -730,7 +730,7 @@ func TestGetRemoteSmall(t *testing.T) {
t.Run("VIRTUAL", func(t *testing.T) { t.Run("VIRTUAL", func(t *testing.T) {
testHeadVirtual := func(svc *Service, addr oid.Address, i *objectSDK.SplitInfo) { testHeadVirtual := func(svc *Service, addr oid.Address, i *objectSDK.SplitInfo) {
headPrm := newHeadPrm(false, nil) headPrm := newHeadPrm(true, nil)
headPrm.WithAddress(addr) headPrm.WithAddress(addr)
errSplit := objectSDK.NewSplitInfoError(objectSDK.NewSplitInfo()) errSplit := objectSDK.NewSplitInfoError(objectSDK.NewSplitInfo())

View file

@ -35,8 +35,12 @@ func (r *request) processNode(ctx context.Context, info client.NodeInfo) bool {
switch { switch {
default: default:
r.log.Debug(logs.GetRemoteCallFailed, zap.Error(err)) r.log.Debug(logs.GetRemoteCallFailed, zap.Error(err))
r.status = statusUndefined if r.status != statusEC {
r.err = new(apistatus.ObjectNotFound) // for raw requests, continue to collect other parts
r.status = statusUndefined
r.err = new(apistatus.ObjectNotFound)
}
return false
case err == nil: case err == nil:
r.status = statusOK r.status = statusOK
r.err = nil r.err = nil
@ -48,22 +52,28 @@ func (r *request) processNode(ctx context.Context, info client.NodeInfo) bool {
r.collectedObject = obj r.collectedObject = obj
r.writeCollectedObject(ctx) r.writeCollectedObject(ctx)
} }
return true
case errors.As(err, &errRemoved): case errors.As(err, &errRemoved):
r.status = statusINHUMED r.status = statusINHUMED
r.err = errRemoved r.err = errRemoved
return true
case errors.As(err, &errOutOfRange): case errors.As(err, &errOutOfRange):
r.status = statusOutOfRange r.status = statusOutOfRange
r.err = errOutOfRange r.err = errOutOfRange
return true
case errors.As(err, &errSplitInfo): case errors.As(err, &errSplitInfo):
r.status = statusVIRTUAL r.status = statusVIRTUAL
mergeSplitInfo(r.splitInfo(), errSplitInfo.SplitInfo()) mergeSplitInfo(r.splitInfo(), errSplitInfo.SplitInfo())
r.err = objectSDK.NewSplitInfoError(r.infoSplit) r.err = objectSDK.NewSplitInfoError(r.infoSplit)
return true
case errors.As(err, &errECInfo): case errors.As(err, &errECInfo):
r.status = statusEC r.status = statusEC
r.err = r.infoEC.addRemote(string(info.PublicKey()), errECInfo.ECInfo()) r.err = r.infoEC.addRemote(string(info.PublicKey()), errECInfo.ECInfo())
if r.isRaw() {
return false // continue to collect all parts
}
return true
} }
return r.status != statusUndefined
} }
func (r *request) getRemote(ctx context.Context, rs remoteStorage, info client.NodeInfo) (*objectSDK.Object, error) { func (r *request) getRemote(ctx context.Context, rs remoteStorage, info client.NodeInfo) (*objectSDK.Object, error) {

View file

@ -88,8 +88,8 @@ func (r *request) key() (*ecdsa.PrivateKey, error) {
return r.keyStore.GetKey(sessionInfo) return r.keyStore.GetKey(sessionInfo)
} }
func (r *request) canAssemble() bool { func (r *request) canAssembleComplexObject() bool {
return !r.isRaw() && !r.headOnly() return !r.isRaw()
} }
func (r *request) splitInfo() *objectSDK.SplitInfo { func (r *request) splitInfo() *objectSDK.SplitInfo {

View file

@ -17,6 +17,7 @@ import (
containerSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container" containerSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/erasurecode" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/erasurecode"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/transformer" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/transformer"
"go.uber.org/zap" "go.uber.org/zap"
"golang.org/x/sync/errgroup" "golang.org/x/sync/errgroup"
@ -39,7 +40,7 @@ type ecWriter struct {
} }
func (e *ecWriter) WriteObject(ctx context.Context, obj *objectSDK.Object) error { func (e *ecWriter) WriteObject(ctx context.Context, obj *objectSDK.Object) error {
relayed, err := e.relayIfNotContainerNode(ctx) relayed, err := e.relayIfNotContainerNode(ctx, obj)
if err != nil { if err != nil {
return err return err
} }
@ -65,7 +66,7 @@ func (e *ecWriter) WriteObject(ctx context.Context, obj *objectSDK.Object) error
return e.writeRawObject(ctx, obj) return e.writeRawObject(ctx, obj)
} }
func (e *ecWriter) relayIfNotContainerNode(ctx context.Context) (bool, error) { func (e *ecWriter) relayIfNotContainerNode(ctx context.Context, obj *objectSDK.Object) (bool, error) {
if e.relay == nil { if e.relay == nil {
return false, nil return false, nil
} }
@ -77,7 +78,13 @@ func (e *ecWriter) relayIfNotContainerNode(ctx context.Context) (bool, error) {
// object can be splitted or saved local // object can be splitted or saved local
return false, nil return false, nil
} }
if err := e.relayToContainerNode(ctx); err != nil { objID := object.AddressOf(obj).Object()
var index uint32
if obj.ECHeader() != nil {
objID = obj.ECHeader().Parent()
index = obj.ECHeader().Index()
}
if err := e.relayToContainerNode(ctx, objID, index); err != nil {
return false, err return false, err
} }
return true, nil return true, nil
@ -102,18 +109,20 @@ func (e *ecWriter) currentNodeIsContainerNode() (bool, error) {
return false, nil return false, nil
} }
func (e *ecWriter) relayToContainerNode(ctx context.Context) error { func (e *ecWriter) relayToContainerNode(ctx context.Context, objID oid.ID, index uint32) error {
t, err := placement.NewTraverser(e.placementOpts...) t, err := placement.NewTraverser(append(e.placementOpts, placement.ForObject(objID))...)
if err != nil { if err != nil {
return err return err
} }
var lastErr error var lastErr error
offset := int(index)
for { for {
nodes := t.Next() nodes := t.Next()
if len(nodes) == 0 { if len(nodes) == 0 {
break break
} }
for _, node := range nodes { for idx := range nodes {
node := nodes[(idx+offset)%len(nodes)]
var info client.NodeInfo var info client.NodeInfo
client.NodeInfoFromNetmapElement(&info, node) client.NodeInfoFromNetmapElement(&info, node)
@ -149,6 +158,10 @@ func (e *ecWriter) relayToContainerNode(ctx context.Context) error {
} }
func (e *ecWriter) writeECPart(ctx context.Context, obj *objectSDK.Object) error { func (e *ecWriter) writeECPart(ctx context.Context, obj *objectSDK.Object) error {
if e.commonPrm.LocalOnly() {
return e.writePartLocal(ctx, obj)
}
t, err := placement.NewTraverser(append(e.placementOpts, placement.ForObject(obj.ECHeader().Parent()))...) t, err := placement.NewTraverser(append(e.placementOpts, placement.ForObject(obj.ECHeader().Parent()))...)
if err != nil { if err != nil {
return err return err

View file

@ -19,6 +19,7 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/policy"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/internal" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/internal"
svcutil "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util" svcutil "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
@ -229,6 +230,9 @@ func (s *Service) getPutSinglePlacementOptions(obj *objectSDK.Object, copiesNumb
if len(copiesNumber) > 0 && !result.isEC { if len(copiesNumber) > 0 && !result.isEC {
result.placementOptions = append(result.placementOptions, placement.WithCopyNumbers(copiesNumber)) result.placementOptions = append(result.placementOptions, placement.WithCopyNumbers(copiesNumber))
} }
if container.IsECContainer(cnrInfo.Value) && !object.IsECSupported(obj) && !localOnly {
result.placementOptions = append(result.placementOptions, placement.SuccessAfter(uint32(policy.ECParityCount(cnrInfo.Value.PlacementPolicy())+1)))
}
result.placementOptions = append(result.placementOptions, placement.ForContainer(cnrInfo.Value)) result.placementOptions = append(result.placementOptions, placement.ForContainer(cnrInfo.Value))
objID, ok := obj.ID() objID, ok := obj.ID()

View file

@ -10,6 +10,7 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/policy"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement"
pkgutil "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util" pkgutil "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util"
@ -212,10 +213,10 @@ func (p *Streamer) newObjectWriter(prm *PutInitPrm) transformer.ObjectWriter {
if container.IsECContainer(prm.cnr) && object.IsECSupported(prm.hdr) { if container.IsECContainer(prm.cnr) && object.IsECSupported(prm.hdr) {
return p.newECWriter(prm) return p.newECWriter(prm)
} }
return p.newDefaultObjectWriter(prm) return p.newDefaultObjectWriter(prm, false)
} }
func (p *Streamer) newDefaultObjectWriter(prm *PutInitPrm) transformer.ObjectWriter { func (p *Streamer) newDefaultObjectWriter(prm *PutInitPrm, forECPlacement bool) transformer.ObjectWriter {
var relay func(context.Context, nodeDesc) error var relay func(context.Context, nodeDesc) error
if p.relay != nil { if p.relay != nil {
relay = func(ctx context.Context, node nodeDesc) error { relay = func(ctx context.Context, node nodeDesc) error {
@ -232,9 +233,16 @@ func (p *Streamer) newDefaultObjectWriter(prm *PutInitPrm) transformer.ObjectWri
} }
} }
traverseOpts := prm.traverseOpts
if forECPlacement && !prm.common.LocalOnly() {
// save non-regular and linking object to EC container.
// EC 2.1 -> REP 2, EC 2.2 -> REP 3 etc.
traverseOpts = append(traverseOpts, placement.SuccessAfter(uint32(policy.ECParityCount(prm.cnr.PlacementPolicy())+1)))
}
return &distributedTarget{ return &distributedTarget{
cfg: p.cfg, cfg: p.cfg,
placementOpts: prm.traverseOpts, placementOpts: traverseOpts,
nodeTargetInitializer: func(node nodeDesc) preparedObjectTarget { nodeTargetInitializer: func(node nodeDesc) preparedObjectTarget {
if node.local { if node.local {
return localTarget{ return localTarget{
@ -266,7 +274,7 @@ func (p *Streamer) newECWriter(prm *PutInitPrm) transformer.ObjectWriter {
commonPrm: prm.common, commonPrm: prm.common,
relay: p.relay, relay: p.relay,
}, },
repWriter: p.newDefaultObjectWriter(prm), repWriter: p.newDefaultObjectWriter(prm, true),
} }
} }

Binary file not shown.