Compare commits

...

18 commits

Author SHA1 Message Date
f82b7e1ae3 [#1135] ir: Add healthstatus RECONFIGURING
All checks were successful
DCO action / DCO (pull_request) Successful in 1m57s
Vulncheck / Vulncheck (pull_request) Successful in 2m0s
Build / Build Components (1.21) (pull_request) Successful in 2m38s
Build / Build Components (1.22) (pull_request) Successful in 2m36s
Pre-commit hooks / Pre-commit (pull_request) Successful in 2m54s
Tests and linters / Tests (1.22) (pull_request) Successful in 2m48s
Tests and linters / Tests (1.21) (pull_request) Successful in 2m50s
Tests and linters / Tests with -race (pull_request) Successful in 2m53s
Tests and linters / Staticcheck (pull_request) Successful in 2m58s
Tests and linters / Lint (pull_request) Successful in 3m15s
Tests and linters / gopls check (pull_request) Successful in 3m33s
Signed-off-by: Ekaterina Lebedeva <ekaterina.lebedeva@yadro.com>
2024-08-15 18:24:10 +03:00
4c520be9f1 [#1313] blobovnicza: Prevent concurrent Put/Close
All checks were successful
Vulncheck / Vulncheck (pull_request) Successful in 1m59s
DCO action / DCO (pull_request) Successful in 1m53s
Build / Build Components (1.21) (pull_request) Successful in 2m20s
Build / Build Components (1.22) (pull_request) Successful in 2m26s
Pre-commit hooks / Pre-commit (pull_request) Successful in 2m56s
Tests and linters / Staticcheck (pull_request) Successful in 2m47s
Tests and linters / Tests (1.21) (pull_request) Successful in 2m56s
Tests and linters / Tests (1.22) (pull_request) Successful in 2m57s
Tests and linters / Lint (pull_request) Successful in 3m7s
Tests and linters / Tests with -race (pull_request) Successful in 3m7s
Tests and linters / gopls check (pull_request) Successful in 3m46s
Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2024-08-15 15:49:07 +03:00
8b4f3d82c9 [#1302] putSvc: Override SuccessAfter for non-regular objects in EC containers
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
2024-08-09 10:06:18 +00:00
078b5fdd6e [#1302] writecache: Allow to specify custom page size
All checks were successful
DCO action / DCO (pull_request) Successful in 2m44s
Vulncheck / Vulncheck (pull_request) Successful in 2m49s
Pre-commit hooks / Pre-commit (pull_request) Successful in 3m31s
Build / Build Components (1.22) (pull_request) Successful in 3m23s
Build / Build Components (1.21) (pull_request) Successful in 3m25s
Tests and linters / Tests (1.21) (pull_request) Successful in 3m41s
Tests and linters / Tests (1.22) (pull_request) Successful in 3m41s
Tests and linters / Tests with -race (pull_request) Successful in 3m49s
Tests and linters / Staticcheck (pull_request) Successful in 3m52s
Tests and linters / Lint (pull_request) Successful in 4m15s
Tests and linters / gopls check (pull_request) Successful in 4m19s
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
2024-08-09 12:39:11 +03:00
b3ab9589a5 [#1302] writecache: Add put->flush->put benchmark
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
2024-08-09 12:35:05 +03:00
54862250b2 [#1295] getSvc: Assemble complex EC object headers without linking object
All checks were successful
DCO action / DCO (pull_request) Successful in 1m30s
Vulncheck / Vulncheck (pull_request) Successful in 1m46s
Build / Build Components (1.22) (pull_request) Successful in 2m18s
Build / Build Components (1.21) (pull_request) Successful in 2m27s
Pre-commit hooks / Pre-commit (pull_request) Successful in 2m46s
Tests and linters / Tests (1.22) (pull_request) Successful in 2m46s
Tests and linters / Tests (1.21) (pull_request) Successful in 2m48s
Tests and linters / Staticcheck (pull_request) Successful in 2m42s
Tests and linters / Tests with -race (pull_request) Successful in 2m51s
Tests and linters / Lint (pull_request) Successful in 3m9s
Tests and linters / gopls check (pull_request) Successful in 3m20s
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
2024-08-06 17:51:23 +03:00
89d670c2f0 [#1295] engine: Resolve funlen linter
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
2024-08-06 17:51:06 +03:00
87c5954f4e [#1295] engine: Log object address in case of error
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
2024-08-06 17:50:48 +03:00
28fc41bd98 [#1291] morph: Reconnect to the highest priority endpoint
All checks were successful
DCO action / DCO (pull_request) Successful in 1m50s
Vulncheck / Vulncheck (pull_request) Successful in 2m7s
Build / Build Components (1.21) (pull_request) Successful in 2m28s
Build / Build Components (1.22) (pull_request) Successful in 2m25s
Pre-commit hooks / Pre-commit (pull_request) Successful in 3m4s
Tests and linters / Tests (1.22) (pull_request) Successful in 3m7s
Tests and linters / Staticcheck (pull_request) Successful in 3m0s
Tests and linters / Lint (pull_request) Successful in 3m16s
Tests and linters / Tests with -race (pull_request) Successful in 3m15s
Tests and linters / Tests (1.21) (pull_request) Successful in 3m30s
Tests and linters / gopls check (pull_request) Successful in 4m10s
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
2024-08-05 13:17:44 +03:00
20f308876c [#1288] putSvc: Respect TTL for EC put
All checks were successful
DCO action / DCO (pull_request) Successful in 3m6s
Vulncheck / Vulncheck (pull_request) Successful in 3m10s
Tests and linters / Tests (1.22) (pull_request) Successful in 3m33s
Build / Build Components (1.22) (pull_request) Successful in 3m47s
Build / Build Components (1.21) (pull_request) Successful in 3m51s
Tests and linters / Staticcheck (pull_request) Successful in 4m0s
Tests and linters / Tests with -race (pull_request) Successful in 4m5s
Tests and linters / Lint (pull_request) Successful in 4m18s
Tests and linters / gopls check (pull_request) Successful in 5m14s
Tests and linters / Tests (1.21) (pull_request) Successful in 1m57s
Pre-commit hooks / Pre-commit (pull_request) Successful in 57s
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
2024-08-02 17:55:54 +03:00
02d191e9a6 [#1279] adm: Interpret "root" name as empty for namespace target type
All checks were successful
DCO action / DCO (pull_request) Successful in 1m35s
Vulncheck / Vulncheck (pull_request) Successful in 1m57s
Tests and linters / Tests (1.22) (pull_request) Successful in 2m11s
Build / Build Components (1.22) (pull_request) Successful in 2m26s
Build / Build Components (1.21) (pull_request) Successful in 2m32s
Pre-commit hooks / Pre-commit (pull_request) Successful in 2m30s
Tests and linters / Staticcheck (pull_request) Successful in 2m45s
Tests and linters / Tests with -race (pull_request) Successful in 2m54s
Tests and linters / Lint (pull_request) Successful in 3m8s
Tests and linters / gopls check (pull_request) Successful in 4m16s
Tests and linters / Tests (1.21) (pull_request) Successful in 1m57s
Signed-off-by: Airat Arifullin <a.arifullin@yadro.com>
2024-08-01 16:11:44 +03:00
cecb3f7867 [#1282] cli: Allow to external addresses first for object nodes
All checks were successful
DCO action / DCO (pull_request) Successful in 1m12s
Build / Build Components (1.22) (pull_request) Successful in 2m14s
Build / Build Components (1.21) (pull_request) Successful in 2m14s
Vulncheck / Vulncheck (pull_request) Successful in 1m56s
Pre-commit hooks / Pre-commit (pull_request) Successful in 2m44s
Tests and linters / Staticcheck (pull_request) Successful in 2m49s
Tests and linters / Tests (1.22) (pull_request) Successful in 2m53s
Tests and linters / Lint (pull_request) Successful in 3m15s
Tests and linters / Tests with -race (pull_request) Successful in 3m22s
Tests and linters / gopls check (pull_request) Successful in 3m34s
Tests and linters / Tests (1.21) (pull_request) Successful in 1m58s
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
2024-08-01 09:18:18 +03:00
f53d30fa95 [#1278] containerSvc: Validate FrostFSID subject exitence on Put
All checks were successful
Build / Build Components (1.21) (pull_request) Successful in 1m37s
Build / Build Components (1.22) (pull_request) Successful in 2m49s
Vulncheck / Vulncheck (pull_request) Successful in 2m25s
Tests and linters / Tests with -race (pull_request) Successful in 5m21s
Tests and linters / Tests (1.22) (pull_request) Successful in 5m48s
DCO action / DCO (pull_request) Successful in 31s
Pre-commit hooks / Pre-commit (pull_request) Successful in 1m12s
Tests and linters / Staticcheck (pull_request) Successful in 1m44s
Tests and linters / Lint (pull_request) Successful in 2m20s
Tests and linters / gopls check (pull_request) Successful in 2m33s
Tests and linters / Tests (1.21) (pull_request) Successful in 3m36s
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
2024-07-29 16:05:10 +03:00
1b92817bd3 [#1278] ir: Do not allow to create container without FrostFSID record
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
2024-07-29 16:04:53 +03:00
32ec421ac7 [#1277] go.mod: Update api-go
All checks were successful
DCO action / DCO (pull_request) Successful in 1m24s
Vulncheck / Vulncheck (pull_request) Successful in 1m26s
Build / Build Components (1.21) (pull_request) Successful in 2m17s
Build / Build Components (1.22) (pull_request) Successful in 2m9s
Tests and linters / Staticcheck (pull_request) Successful in 3m1s
Tests and linters / gopls check (pull_request) Successful in 3m29s
Tests and linters / Lint (pull_request) Successful in 4m16s
Pre-commit hooks / Pre-commit (pull_request) Successful in 5m27s
Tests and linters / Tests with -race (pull_request) Successful in 7m5s
Tests and linters / Tests (1.21) (pull_request) Successful in 8m1s
Tests and linters / Tests (1.22) (pull_request) Successful in 8m2s
Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2024-07-26 17:27:41 +03:00
4d102b05e5 [#1274] go.mod: Update neo-go version that fixes ws-client
All checks were successful
DCO action / DCO (pull_request) Successful in 1m17s
Build / Build Components (1.21) (pull_request) Successful in 1m59s
Build / Build Components (1.22) (pull_request) Successful in 2m9s
Vulncheck / Vulncheck (pull_request) Successful in 1m45s
Tests and linters / Staticcheck (pull_request) Successful in 2m45s
Tests and linters / gopls check (pull_request) Successful in 3m18s
Pre-commit hooks / Pre-commit (pull_request) Successful in 5m2s
Tests and linters / Lint (pull_request) Successful in 4m58s
Tests and linters / Tests with -race (pull_request) Successful in 5m4s
Tests and linters / Tests (1.21) (pull_request) Successful in 7m46s
Tests and linters / Tests (1.22) (pull_request) Successful in 7m48s
* Update go.mod;
* This neo-go package version contains fix for the wsclient that
  allows to morph event listener refresh the invalidated websocket
  connection to neo-go.

Signed-off-by: Airat Arifullin <a.arifullin@yadro.com>
2024-07-26 15:52:03 +03:00
308da7cb01 [#1271] getSvc: Fix local EC chunk get
All checks were successful
DCO action / DCO (pull_request) Successful in 1m2s
Vulncheck / Vulncheck (pull_request) Successful in 1m22s
Build / Build Components (1.22) (pull_request) Successful in 2m36s
Build / Build Components (1.21) (pull_request) Successful in 2m36s
Tests and linters / Staticcheck (pull_request) Successful in 3m19s
Tests and linters / gopls check (pull_request) Successful in 3m27s
Tests and linters / Lint (pull_request) Successful in 4m22s
Pre-commit hooks / Pre-commit (pull_request) Successful in 5m14s
Tests and linters / Tests (1.21) (pull_request) Successful in 8m12s
Tests and linters / Tests (1.22) (pull_request) Successful in 8m14s
Tests and linters / Tests with -race (pull_request) Successful in 8m29s
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
2024-07-24 10:23:35 +03:00
37b83c0856 [#1271] getSvc: Fix head --raw assemble for EC
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
2024-07-24 10:23:15 +03:00
68 changed files with 320 additions and 556 deletions

View file

@ -38,6 +38,12 @@ var (
func parseTarget(cmd *cobra.Command) policyengine.Target {
name, _ := cmd.Flags().GetString(targetNameFlag)
typ, err := parseTargetType(cmd)
// interpret "root" namespace as empty
if typ == policyengine.Namespace && name == "root" {
name = ""
}
commonCmd.ExitOnErr(cmd, "read target type error: %w", err)
return policyengine.Target{

View file

@ -214,29 +214,6 @@ func EACL(ctx context.Context, prm EACLPrm) (res EACLRes, err error) {
return
}
// SetEACLPrm groups parameters of SetEACL operation.
type SetEACLPrm struct {
Client *client.Client
ClientParams client.PrmContainerSetEACL
}
// SetEACLRes groups the resulting values of SetEACL operation.
type SetEACLRes struct{}
// SetEACL requests to save an eACL table in FrostFS.
//
// Operation is asynchronous and no guaranteed even in the absence of errors.
// The required time is also not predictable.
//
// Success can be verified by reading by container identifier.
//
// Returns any error which prevented the operation from completing correctly in error return.
func SetEACL(ctx context.Context, prm SetEACLPrm) (res SetEACLRes, err error) {
_, err = prm.Client.ContainerSetEACL(ctx, prm.ClientParams)
return
}
// NetworkInfoPrm groups parameters of NetworkInfo operation.
type NetworkInfoPrm struct {
Client *client.Client

View file

@ -26,7 +26,6 @@ func init() {
listContainerObjectsCmd,
getContainerInfoCmd,
getExtendedACLCmd,
setExtendedACLCmd,
containerNodesCmd,
policyPlaygroundCmd,
}
@ -39,7 +38,6 @@ func init() {
initContainerListObjectsCmd()
initContainerInfoCmd()
initContainerGetEACLCmd()
initContainerSetEACLCmd()
initContainerNodesCmd()
initContainerPolicyPlaygroundCmd()
@ -53,7 +51,6 @@ func init() {
}{
{createContainerCmd, "PUT"},
{deleteContainerCmd, "DELETE"},
{setExtendedACLCmd, "SETEACL"},
} {
commonflags.InitSession(el.cmd, "container "+el.verb)
}

View file

@ -1,108 +0,0 @@
package container
import (
"bytes"
"errors"
"time"
internalclient "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/client"
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/common"
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/commonflags"
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/key"
commonCmd "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/internal/common"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
"github.com/spf13/cobra"
)
var flagVarsSetEACL struct {
noPreCheck bool
srcPath string
}
var setExtendedACLCmd = &cobra.Command{
Use: "set-eacl",
Short: "Set new extended ACL table for container",
Long: `Set new extended ACL table for container.
Container ID in EACL table will be substituted with ID from the CLI.`,
Run: func(cmd *cobra.Command, _ []string) {
id := parseContainerID(cmd)
eaclTable := common.ReadEACL(cmd, flagVarsSetEACL.srcPath)
tok := getSession(cmd)
eaclTable.SetCID(id)
pk := key.GetOrGenerate(cmd)
cli := internalclient.GetSDKClientByFlag(cmd, pk, commonflags.RPC)
if !flagVarsSetEACL.noPreCheck {
cmd.Println("Checking the ability to modify access rights in the container...")
extendable, err := internalclient.IsACLExtendable(cmd.Context(), cli, id)
commonCmd.ExitOnErr(cmd, "Extensibility check failure: %w", err)
if !extendable {
commonCmd.ExitOnErr(cmd, "", errors.New("container ACL is immutable"))
}
cmd.Println("ACL extension is enabled in the container, continue processing.")
}
setEACLPrm := internalclient.SetEACLPrm{
Client: cli,
ClientParams: client.PrmContainerSetEACL{
Table: eaclTable,
Session: tok,
},
}
_, err := internalclient.SetEACL(cmd.Context(), setEACLPrm)
commonCmd.ExitOnErr(cmd, "rpc error: %w", err)
if containerAwait {
exp, err := eaclTable.Marshal()
commonCmd.ExitOnErr(cmd, "broken EACL table: %w", err)
cmd.Println("awaiting...")
getEACLPrm := internalclient.EACLPrm{
Client: cli,
ClientParams: client.PrmContainerEACL{
ContainerID: &id,
},
}
for i := 0; i < awaitTimeout; i++ {
time.Sleep(1 * time.Second)
res, err := internalclient.EACL(cmd.Context(), getEACLPrm)
if err == nil {
// compare binary values because EACL could have been set already
table := res.EACL()
got, err := table.Marshal()
if err != nil {
continue
}
if bytes.Equal(exp, got) {
cmd.Println("EACL has been persisted on sidechain")
return
}
}
}
commonCmd.ExitOnErr(cmd, "", errSetEACLTimeout)
}
},
}
func initContainerSetEACLCmd() {
commonflags.Init(setExtendedACLCmd)
flags := setExtendedACLCmd.Flags()
flags.StringVar(&containerID, commonflags.CIDFlag, "", commonflags.CIDFlagUsage)
flags.StringVar(&flagVarsSetEACL.srcPath, "table", "", "path to file with JSON or binary encoded EACL table")
flags.BoolVar(&containerAwait, "await", false, "block execution until EACL is persisted")
flags.BoolVar(&flagVarsSetEACL.noPreCheck, "no-precheck", false, "do not pre-check the extensibility of the container ACL")
}

View file

@ -20,7 +20,6 @@ const (
var (
errCreateTimeout = errors.New("timeout: container has not been persisted on sidechain")
errDeleteTimeout = errors.New("timeout: container has not been removed from sidechain")
errSetEACLTimeout = errors.New("timeout: EACL has not been persisted on sidechain")
)
func parseContainerID(cmd *cobra.Command) cid.ID {

View file

@ -31,6 +31,7 @@ import (
const (
verifyPresenceAllFlag = "verify-presence-all"
preferInternalAddressesFlag = "prefer-internal-addresses"
)
var (
@ -97,6 +98,7 @@ func initObjectNodesCmd() {
flags.Bool(verifyPresenceAllFlag, false, "Verify the actual presence of the object on all netmap nodes.")
flags.Bool(commonflags.JSON, false, "Print information about the object placement as json.")
flags.Bool(preferInternalAddressesFlag, false, "Use internal addresses first to get object info.")
}
func objectNodes(cmd *cobra.Command, _ []string) {
@ -449,11 +451,20 @@ func getNodesToCheckObjectExistance(cmd *cobra.Command, netmap *netmapSDK.NetMap
func createClient(ctx context.Context, cmd *cobra.Command, candidate netmapSDK.NodeInfo, pk *ecdsa.PrivateKey) (*client.Client, error) {
var cli *client.Client
var addresses []string
if preferInternal, _ := cmd.Flags().GetBool(preferInternalAddressesFlag); preferInternal {
candidate.IterateNetworkEndpoints(func(s string) bool {
addresses = append(addresses, s)
return false
})
addresses = append(addresses, candidate.ExternalAddresses()...)
} else {
addresses = append(addresses, candidate.ExternalAddresses()...)
candidate.IterateNetworkEndpoints(func(s string) bool {
addresses = append(addresses, s)
return false
})
}
var lastErr error
for _, address := range addresses {
var networkAddr network.Address

View file

@ -7,6 +7,7 @@ import (
configViper "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/internal/common/config"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
control "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/control/ir"
"github.com/spf13/viper"
"go.uber.org/zap"
)
@ -81,6 +82,10 @@ func watchForSignal(cancel func()) {
return
case <-sighupCh:
log.Info(logs.FrostFSNodeSIGHUPHasBeenReceivedRereadingConfiguration)
if !innerRing.CompareAndSwapHealthStatus(control.HealthStatus_READY, control.HealthStatus_RECONFIGURING) {
log.Info(logs.FrostFSNodeSIGHUPSkip)
break
}
err := reloadConfig()
if err != nil {
log.Error(logs.FrostFSNodeConfigurationReading, zap.Error(err))
@ -92,6 +97,7 @@ func watchForSignal(cancel func()) {
if err != nil {
log.Error(logs.FrostFSNodeConfigurationReading, zap.Error(err))
}
innerRing.CompareAndSwapHealthStatus(control.HealthStatus_RECONFIGURING, control.HealthStatus_READY)
log.Info(logs.FrostFSNodeConfigurationHasBeenReloadedSuccessfully)
}
}

View file

@ -25,7 +25,7 @@ func init() {
func inspectFunc(cmd *cobra.Command, _ []string) {
var data []byte
db, err := writecache.OpenDB(vPath, true, os.OpenFile)
db, err := writecache.OpenDB(vPath, true, os.OpenFile, 0)
common.ExitOnErr(cmd, common.Errf("could not open write-cache db: %w", err))
defer db.Close()

View file

@ -31,7 +31,7 @@ func listFunc(cmd *cobra.Command, _ []string) {
return err
}
db, err := writecache.OpenDB(vPath, true, os.OpenFile)
db, err := writecache.OpenDB(vPath, true, os.OpenFile, 0)
common.ExitOnErr(cmd, common.Errf("could not open write-cache db: %w", err))
defer db.Close()

View file

@ -153,6 +153,7 @@ type shardCfg struct {
flushWorkerCount int
sizeLimit uint64
noSync bool
pageSize int
}
piloramaCfg struct {
@ -271,6 +272,7 @@ func (a *applicationConfiguration) setShardWriteCacheConfig(newConfig *shardCfg,
wc.path = writeCacheCfg.Path()
wc.maxBatchSize = writeCacheCfg.BoltDB().MaxBatchSize()
wc.maxBatchDelay = writeCacheCfg.BoltDB().MaxBatchDelay()
wc.pageSize = writeCacheCfg.BoltDB().PageSize()
wc.maxObjSize = writeCacheCfg.MaxObjectSize()
wc.smallObjectSize = writeCacheCfg.SmallObjectSize()
wc.flushWorkerCount = writeCacheCfg.WorkerCount()
@ -863,6 +865,7 @@ func (c *cfg) getWriteCacheOpts(shCfg shardCfg) []writecache.Option {
writecache.WithPath(wcRead.path),
writecache.WithMaxBatchSize(wcRead.maxBatchSize),
writecache.WithMaxBatchDelay(wcRead.maxBatchDelay),
writecache.WithPageSize(wcRead.pageSize),
writecache.WithMaxObjectSize(wcRead.maxObjSize),
writecache.WithSmallObjectSize(wcRead.smallObjectSize),
writecache.WithFlushWorkersCount(wcRead.flushWorkerCount),

View file

@ -78,6 +78,7 @@ func TestEngineSection(t *testing.T) {
require.EqualValues(t, 134217728, wc.MaxObjectSize())
require.EqualValues(t, 30, wc.WorkerCount())
require.EqualValues(t, 3221225472, wc.SizeLimit())
require.EqualValues(t, 4096, wc.BoltDB().PageSize())
require.Equal(t, "tmp/0/meta", meta.Path())
require.Equal(t, fs.FileMode(0o644), meta.BoltDB().Perm())
@ -133,6 +134,7 @@ func TestEngineSection(t *testing.T) {
require.EqualValues(t, 134217728, wc.MaxObjectSize())
require.EqualValues(t, 30, wc.WorkerCount())
require.EqualValues(t, 4294967296, wc.SizeLimit())
require.EqualValues(t, 0, wc.BoltDB().PageSize())
require.Equal(t, "tmp/1/meta", meta.Path())
require.Equal(t, fs.FileMode(0o644), meta.BoltDB().Perm())

View file

@ -60,3 +60,14 @@ func (x *Config) MaxBatchSize() int {
func (x *Config) NoSync() bool {
return config.BoolSafe((*config.Config)(x), "no_sync")
}
// PageSize returns the value of "page_size" config parameter.
//
// Returns 0 if the value is not a positive number.
func (x *Config) PageSize() int {
s := int(config.SizeInBytesSafe((*config.Config)(x), "page_size"))
if s < 0 {
s = 0
}
return s
}

View file

@ -105,6 +105,7 @@ FROSTFS_STORAGE_SHARD_0_WRITECACHE_SMALL_OBJECT_SIZE=16384
FROSTFS_STORAGE_SHARD_0_WRITECACHE_MAX_OBJECT_SIZE=134217728
FROSTFS_STORAGE_SHARD_0_WRITECACHE_FLUSH_WORKER_COUNT=30
FROSTFS_STORAGE_SHARD_0_WRITECACHE_CAPACITY=3221225472
FROSTFS_STORAGE_SHARD_0_WRITECACHE_PAGE_SIZE=4096
### Metabase config
FROSTFS_STORAGE_SHARD_0_METABASE_PATH=tmp/0/meta
FROSTFS_STORAGE_SHARD_0_METABASE_PERM=0644

View file

@ -148,7 +148,8 @@
"small_object_size": 16384,
"max_object_size": 134217728,
"flush_worker_count": 30,
"capacity": 3221225472
"capacity": 3221225472,
"page_size": 4096
},
"metabase": {
"path": "tmp/0/meta",

View file

@ -171,6 +171,7 @@ storage:
no_sync: true
path: tmp/0/cache # write-cache root directory
capacity: 3221225472 # approximate write-cache total size, bytes
page_size: 4k
metabase:
path: tmp/0/meta # metabase path

View file

@ -290,6 +290,7 @@ writecache:
small_object_size: 16384
max_object_size: 134217728
flush_worker_count: 30
page_size: '4k'
```
| Parameter | Type | Default value | Description |
@ -301,6 +302,7 @@ writecache:
| `flush_worker_count` | `int` | `20` | Amount of background workers that move data from the writecache to the blobstor. |
| `max_batch_size` | `int` | `1000` | Maximum amount of small object `PUT` operations to perform in a single transaction. |
| `max_batch_delay` | `duration` | `10ms` | Maximum delay before a batch starts. |
| `page_size` | `size` | `0` | Page size overrides the default OS page size for small objects storage. Does not affect the existing storage. |
# `node` section

10
go.mod
View file

@ -4,11 +4,11 @@ go 1.21
require (
code.gitea.io/sdk/gitea v0.17.1
git.frostfs.info/TrueCloudLab/frostfs-api-go/v2 v2.16.1-0.20240530152826-2f6d3209e1d3
git.frostfs.info/TrueCloudLab/frostfs-api-go/v2 v2.16.1-0.20240726072425-3dfa2f4fd65e
git.frostfs.info/TrueCloudLab/frostfs-contract v0.19.3-0.20240621131249-49e5270f673e
git.frostfs.info/TrueCloudLab/frostfs-crypto v0.6.0
git.frostfs.info/TrueCloudLab/frostfs-observability v0.0.0-20231101111734-b3ad3335ff65
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20240617140730-1a5886e776de
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20240726111349-9da46f566fec
git.frostfs.info/TrueCloudLab/hrw v1.2.1
git.frostfs.info/TrueCloudLab/policy-engine v0.0.0-20240712081403-2628f6184984
git.frostfs.info/TrueCloudLab/tzhash v1.8.0
@ -23,7 +23,7 @@ require (
github.com/mitchellh/go-homedir v1.1.0
github.com/mr-tron/base58 v1.2.0
github.com/multiformats/go-multiaddr v0.12.1
github.com/nspcc-dev/neo-go v0.106.0
github.com/nspcc-dev/neo-go v0.106.2
github.com/olekukonko/tablewriter v0.0.5
github.com/panjf2000/ants/v2 v2.9.0
github.com/paulmach/orb v0.11.0
@ -38,7 +38,7 @@ require (
go.opentelemetry.io/otel v1.22.0
go.opentelemetry.io/otel/trace v1.22.0
go.uber.org/zap v1.27.0
golang.org/x/exp v0.0.0-20240119083558-1b970713d09a
golang.org/x/exp v0.0.0-20240222234643-814bf88cf225
golang.org/x/sync v0.6.0
golang.org/x/sys v0.18.0
golang.org/x/term v0.18.0
@ -128,4 +128,4 @@ require (
rsc.io/tmplfunc v0.0.3 // indirect
)
replace github.com/nspcc-dev/neo-go => git.frostfs.info/TrueCloudLab/neoneo-go v0.106.1-0.20240611123832-594f716b3d18
replace github.com/nspcc-dev/neo-go => git.frostfs.info/TrueCloudLab/neoneo-go v0.106.1-0.20240726093631-5481339d6928

BIN
go.sum

Binary file not shown.

View file

@ -11,7 +11,6 @@ import (
// Client is an interface of FrostFS storage
// node's client.
type Client interface {
ContainerAnnounceUsedSpace(context.Context, client.PrmAnnounceSpace) (*client.ResAnnounceSpace, error)
ObjectPutInit(context.Context, client.PrmObjectPutInit) (client.ObjectWriter, error)
ObjectPutSingle(context.Context, client.PrmObjectPutSingle) (*client.ResObjectPutSingle, error)
ObjectDelete(context.Context, client.PrmObjectDelete) (*client.ResObjectDelete, error)

View file

@ -3,7 +3,6 @@ package container
import (
"crypto/ecdsa"
"encoding/hex"
"fmt"
"testing"
"time"
@ -238,5 +237,5 @@ func (c *testMorphClient) NotarySignAndInvokeTX(mainTx *transaction.Transaction)
type testFrostFSIDClient struct{}
func (c *testFrostFSIDClient) GetSubject(addr util.Uint160) (*frostfsidclient.Subject, error) {
return nil, fmt.Errorf("subject not found")
return &frostfsidclient.Subject{}, nil
}

View file

@ -180,11 +180,6 @@ func (cp *Processor) checkNNS(ctx *putContainerContext, cnr containerSDK.Contain
}
}
namespace, hasNamespace := strings.CutSuffix(ctx.d.Zone(), ".ns")
if !hasNamespace {
return nil
}
addr, err := util.Uint160DecodeBytesBE(cnr.Owner().WalletBytes()[1 : 1+util.Uint160Size])
if err != nil {
return fmt.Errorf("could not get container owner address: %w", err)
@ -195,6 +190,11 @@ func (cp *Processor) checkNNS(ctx *putContainerContext, cnr containerSDK.Contain
return fmt.Errorf("could not get subject from FrostfsID contract: %w", err)
}
namespace, hasNamespace := strings.CutSuffix(ctx.d.Zone(), ".ns")
if !hasNamespace {
return nil
}
if subject.Namespace != namespace {
return errContainerAndOwnerNamespaceDontMatch
}

View file

@ -161,6 +161,16 @@ func (s *Server) setHealthStatus(hs control.HealthStatus) {
}
}
func (s *Server) CompareAndSwapHealthStatus(oldSt, newSt control.HealthStatus) (swapped bool) {
if swapped = s.healthStatus.CompareAndSwap(int32(oldSt), int32(newSt)); swapped {
s.notifySystemd(newSt)
if s.irMetrics != nil {
s.irMetrics.SetHealth(int32(newSt))
}
}
return
}
// HealthStatus returns the current health status of the IR application.
func (s *Server) HealthStatus() control.HealthStatus {
return control.HealthStatus(s.healthStatus.Load())
@ -186,6 +196,8 @@ func (s *Server) notifySystemd(st control.HealthStatus) {
err = sdnotify.FlagAndStatus(sdnotify.ReadyEnabled)
case control.HealthStatus_SHUTTING_DOWN:
err = sdnotify.FlagAndStatus(sdnotify.StoppingEnabled)
case control.HealthStatus_RECONFIGURING:
err = sdnotify.FlagAndStatus(sdnotify.ReloadingEnabled)
default:
err = sdnotify.Status(fmt.Sprintf("%v", st))
}

View file

@ -22,7 +22,7 @@ type Blobovnicza struct {
boltDB *bbolt.DB
opened bool
controlMtx sync.Mutex
controlMtx sync.RWMutex
}
// Option is an option of Blobovnicza's constructor.

View file

@ -45,6 +45,9 @@ func (b *Blobovnicza) Delete(ctx context.Context, prm DeletePrm) (DeleteRes, err
))
defer span.End()
b.controlMtx.RLock()
defer b.controlMtx.RUnlock()
addrKey := addressKey(prm.addr)
found := false

View file

@ -21,6 +21,9 @@ func (b *Blobovnicza) Exists(ctx context.Context, addr oid.Address) (bool, error
))
defer span.End()
b.controlMtx.RLock()
defer b.controlMtx.RUnlock()
addrKey := addressKey(addr)
err := b.boltDB.View(func(tx *bbolt.Tx) error {

View file

@ -51,6 +51,9 @@ func (b *Blobovnicza) Get(ctx context.Context, prm GetPrm) (GetRes, error) {
))
defer span.End()
b.controlMtx.RLock()
defer b.controlMtx.RUnlock()
var (
data []byte
addrKey = addressKey(prm.addr)

View file

@ -128,6 +128,9 @@ func (b *Blobovnicza) Iterate(ctx context.Context, prm IteratePrm) (IterateRes,
))
defer span.End()
b.controlMtx.RLock()
defer b.controlMtx.RUnlock()
var elem IterationElement
if err := b.boltDB.View(func(tx *bbolt.Tx) error {

View file

@ -29,6 +29,9 @@ func (b *Blobovnicza) PutMoveInfo(ctx context.Context, prm MoveInfo) error {
))
defer span.End()
b.controlMtx.RLock()
defer b.controlMtx.RUnlock()
key := addressKey(prm.Address)
err := b.boltDB.Update(func(tx *bbolt.Tx) error {

View file

@ -64,6 +64,9 @@ func (b *Blobovnicza) Put(ctx context.Context, prm PutPrm) (PutRes, error) {
))
defer span.End()
b.controlMtx.RLock()
defer b.controlMtx.RUnlock()
sz := uint64(len(prm.objData))
bucketName := bucketForSize(sz)
key := addressKey(prm.addr)

View file

@ -100,7 +100,7 @@ func (e *StorageEngine) delete(ctx context.Context, prm DeletePrm) (DeleteRes, e
return false
} else {
if !client.IsErrObjectNotFound(err) {
e.reportShardError(sh, "could not check object existence", err)
e.reportShardError(sh, "could not check object existence", err, zap.Stringer("address", prm.addr))
}
return false
}
@ -116,7 +116,7 @@ func (e *StorageEngine) delete(ctx context.Context, prm DeletePrm) (DeleteRes, e
_, err = sh.Inhume(ctx, shPrm)
if err != nil {
e.reportShardError(sh, "could not inhume object in shard", err)
e.reportShardError(sh, "could not inhume object in shard", err, zap.Stringer("address", prm.addr))
var target *apistatus.ObjectLocked
locked.is = errors.As(err, &target)
@ -191,7 +191,7 @@ func (e *StorageEngine) deleteChunks(
var objID oid.ID
err := objID.ReadFromV2(chunk.ID)
if err != nil {
e.reportShardError(sh, "could not delete EC chunk", err)
e.reportShardError(sh, "could not delete EC chunk", err, zap.Stringer("address", prm.addr))
}
addr.SetObject(objID)
inhumePrm.MarkAsGarbage(addr)

View file

@ -8,6 +8,7 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
"go.uber.org/zap"
)
// exists return in the first value true if object exists.
@ -36,7 +37,7 @@ func (e *StorageEngine) exists(ctx context.Context, shPrm shard.ExistsPrm) (bool
}
if !client.IsErrObjectNotFound(err) {
e.reportShardError(sh, "could not check existence of object in shard", err)
e.reportShardError(sh, "could not check existence of object in shard", err, zap.Stringer("address", shPrm.Address))
}
return false
}

View file

@ -186,7 +186,7 @@ func (i *getShardIterator) tryGetWithMeta(ctx context.Context) {
i.ObjectExpired = true
return true
default:
i.Engine.reportShardError(sh, "could not get object from shard", err)
i.Engine.reportShardError(sh, "could not get object from shard", err, zap.Stringer("address", i.Address))
return false
}
})

View file

@ -12,6 +12,7 @@ import (
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"go.uber.org/zap"
)
// HeadPrm groups the parameters of Head operation.
@ -118,7 +119,7 @@ func (e *StorageEngine) head(ctx context.Context, prm HeadPrm) (HeadRes, error)
outError = new(apistatus.ObjectNotFound)
return true
default:
e.reportShardError(sh, "could not head object from shard", err)
e.reportShardError(sh, "could not head object from shard", err, zap.Stringer("address", prm.addr))
return false
}
}
@ -126,17 +127,16 @@ func (e *StorageEngine) head(ctx context.Context, prm HeadPrm) (HeadRes, error)
return true
})
if head != nil {
return HeadRes{head: head}, nil
}
if outSI != nil {
return HeadRes{}, logicerr.Wrap(objectSDK.NewSplitInfoError(outSI))
} else if outEI != nil {
return HeadRes{}, logicerr.Wrap(objectSDK.NewECInfoError(outEI))
} else if head == nil {
return HeadRes{}, outError
}
return HeadRes{
head: head,
}, nil
if outEI != nil {
return HeadRes{}, logicerr.Wrap(objectSDK.NewECInfoError(outEI))
}
return HeadRes{}, outError
}
// Head reads object header from local storage by provided address.

View file

@ -154,7 +154,7 @@ func (e *StorageEngine) inhumeAddr(ctx context.Context, addr oid.Address, prm sh
var siErr *objectSDK.SplitInfoError
var ecErr *objectSDK.ECInfoError
if !(errors.As(err, &siErr) || errors.As(err, &ecErr)) {
e.reportShardError(sh, "could not check for presents in shard", err)
e.reportShardError(sh, "could not check for presents in shard", err, zap.Stringer("address", addr))
return
}
@ -179,7 +179,7 @@ func (e *StorageEngine) inhumeAddr(ctx context.Context, addr oid.Address, prm sh
return true
}
e.reportShardError(sh, "could not inhume object in shard", err)
e.reportShardError(sh, "could not inhume object in shard", err, zap.Stringer("address", addr))
return false
}
@ -205,7 +205,7 @@ func (e *StorageEngine) IsLocked(ctx context.Context, addr oid.Address) (bool, e
e.iterateOverUnsortedShards(func(h hashedShard) (stop bool) {
locked, err = h.Shard.IsLocked(ctx, addr)
if err != nil {
e.reportShardError(h, "can't check object's lockers", err, zap.Stringer("addr", addr),
e.reportShardError(h, "can't check object's lockers", err, zap.Stringer("address", addr),
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
outErr = err
return false
@ -235,7 +235,7 @@ func (e *StorageEngine) GetLocked(ctx context.Context, addr oid.Address) ([]oid.
e.iterateOverUnsortedShards(func(h hashedShard) (stop bool) {
ld, err := h.Shard.GetLocked(ctx, addr)
if err != nil {
e.reportShardError(h, logs.EngineInterruptGettingLockers, err, zap.Stringer("addr", addr),
e.reportShardError(h, logs.EngineInterruptGettingLockers, err, zap.Stringer("address", addr),
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
outErr = err
}

View file

@ -13,6 +13,7 @@ import (
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"
)
var errLockFailed = errors.New("lock operation failed")
@ -62,11 +63,9 @@ func (e *StorageEngine) lock(ctx context.Context, idCnr cid.ID, locker oid.ID, l
func (e *StorageEngine) lockSingle(ctx context.Context, idCnr cid.ID, locker, locked oid.ID, checkExists bool) (status uint8) {
// code is pretty similar to inhumeAddr, maybe unify?
root := false
var addrLocked oid.Address
addrLocked.SetContainer(idCnr)
addrLocked.SetObject(locked)
e.iterateOverSortedShards(addrLocked, func(_ int, sh hashedShard) (stop bool) {
defer func() {
// if object is root we continue since information about it
@ -79,7 +78,6 @@ func (e *StorageEngine) lockSingle(ctx context.Context, idCnr cid.ID, locker, lo
if checkExists {
var existsPrm shard.ExistsPrm
existsPrm.Address = addrLocked
exRes, err := sh.Exists(ctx, existsPrm)
if err != nil {
var siErr *objectSDK.SplitInfoError
@ -90,14 +88,16 @@ func (e *StorageEngine) lockSingle(ctx context.Context, idCnr cid.ID, locker, lo
var objID oid.ID
err = objID.ReadFromV2(chunk.ID)
if err != nil {
e.reportShardError(sh, "could not lock object in shard", err)
e.reportShardError(sh, "could not lock object in shard", err, zap.Stringer("container_id", idCnr),
zap.Stringer("locker_id", locker), zap.Stringer("locked_id", locked))
return false
}
eclocked = append(eclocked, objID)
}
err = sh.Lock(ctx, idCnr, locker, eclocked)
if err != nil {
e.reportShardError(sh, "could not lock object in shard", err)
e.reportShardError(sh, "could not lock object in shard", err, zap.Stringer("container_id", idCnr),
zap.Stringer("locker_id", locker), zap.Stringer("locked_id", locked))
return false
}
root = true
@ -108,8 +108,8 @@ func (e *StorageEngine) lockSingle(ctx context.Context, idCnr cid.ID, locker, lo
// do not lock it
return true
}
e.reportShardError(sh, "could not check locked object for presence in shard", err)
e.reportShardError(sh, "could not check locked object for presence in shard", err, zap.Stringer("container_id", idCnr),
zap.Stringer("locker_id", locker), zap.Stringer("locked_id", locked))
return
}
@ -121,21 +121,18 @@ func (e *StorageEngine) lockSingle(ctx context.Context, idCnr cid.ID, locker, lo
err := sh.Lock(ctx, idCnr, locker, []oid.ID{locked})
if err != nil {
e.reportShardError(sh, "could not lock object in shard", err)
e.reportShardError(sh, "could not lock object in shard", err, zap.Stringer("container_id", idCnr),
zap.Stringer("locker_id", locker), zap.Stringer("locked_id", locked))
var errIrregular *apistatus.LockNonRegularObject
if errors.As(err, &errIrregular) {
status = 1
return true
}
return false
}
status = 2
return true
})
return
}

View file

@ -187,7 +187,7 @@ func (e *StorageEngine) putToShard(ctx context.Context, sh hashedShard, pool uti
return
}
e.reportShardError(sh, "could not put object to shard", err)
e.reportShardError(sh, "could not put object to shard", err, zap.Stringer("address", addr))
return
}

View file

@ -208,7 +208,7 @@ func (i *getRangeShardIterator) tryGetWithMeta(ctx context.Context) {
return true // stop, return it back
default:
i.Engine.reportShardError(sh, "could not get object from shard", err)
i.Engine.reportShardError(sh, "could not get object from shard", err, zap.Stringer("address", i.Address))
return false
}
})

View file

@ -160,11 +160,23 @@ func getVirtualObject(tx *bbolt.Tx, cnr cid.ID, key []byte, raw bool) (*objectSD
return nil, logicerr.Wrap(new(apistatus.ObjectNotFound))
}
// pick last item, for now there is not difference which address to pick
// but later list might be sorted so first or last value can be more
// prioritized to choose
virtualOID := relativeLst[len(relativeLst)-1]
data := getFromBucket(tx, primaryBucketName(cnr, bucketName), virtualOID)
var data []byte
for i := 0; i < len(relativeLst) && len(data) == 0; i++ {
virtualOID := relativeLst[len(relativeLst)-i-1]
data = getFromBucket(tx, primaryBucketName(cnr, bucketName), virtualOID)
}
if len(data) == 0 {
// check if any of the relatives is an EC object
for _, relative := range relativeLst {
data = getFromBucket(tx, ecInfoBucketName(cnr, bucketName), relative)
if len(data) > 0 {
// we can't return object headers, but can return error,
// so assembler can try to assemble complex object
return nil, getSplitInfoError(tx, cnr, key)
}
}
}
child := objectSDK.New()

View file

@ -2,6 +2,7 @@ package benchmark
import (
"context"
"fmt"
"testing"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
@ -27,6 +28,24 @@ func BenchmarkWritecachePar(b *testing.B) {
})
}
func BenchmarkWriteAfterDelete(b *testing.B) {
const payloadSize = 32 << 10
const parallel = 25
cache := newCache(b)
benchmarkPutPrepare(b, cache)
b.Run(fmt.Sprintf("%dB_before", payloadSize), func(b *testing.B) {
b.SetParallelism(parallel)
benchmarkRunPar(b, cache, payloadSize)
})
require.NoError(b, cache.Flush(context.Background(), false, false))
b.Run(fmt.Sprintf("%dB_after", payloadSize), func(b *testing.B) {
b.SetParallelism(parallel)
benchmarkRunPar(b, cache, payloadSize)
})
require.NoError(b, cache.Close())
}
func benchmarkPutSeq(b *testing.B, cache writecache.Cache, size uint64) {
benchmarkPutPrepare(b, cache)
defer func() { require.NoError(b, cache.Close()) }()
@ -54,6 +73,10 @@ func benchmarkPutPar(b *testing.B, cache writecache.Cache, size uint64) {
benchmarkPutPrepare(b, cache)
defer func() { require.NoError(b, cache.Close()) }()
benchmarkRunPar(b, cache, size)
}
func benchmarkRunPar(b *testing.B, cache writecache.Cache, size uint64) {
ctx := context.Background()
b.ResetTimer()

View file

@ -45,6 +45,8 @@ type options struct {
metrics Metrics
// disableBackgroundFlush is for testing purposes only.
disableBackgroundFlush bool
// pageSize is bbolt's page size config value
pageSize int
}
// WithLogger sets logger.
@ -163,3 +165,10 @@ func WithDisableBackgroundFlush() Option {
o.disableBackgroundFlush = true
}
}
// WithPageSize sets bbolt's page size.
func WithPageSize(s int) Option {
return func(o *options) {
o.pageSize = s
}
}

View file

@ -32,7 +32,7 @@ func (c *cache) openStore(mod mode.ComponentMode) error {
return err
}
c.db, err = OpenDB(c.path, mod.ReadOnly(), c.openFile)
c.db, err = OpenDB(c.path, mod.ReadOnly(), c.openFile, c.pageSize)
if err != nil {
return fmt.Errorf("could not open database: %w", err)
}

View file

@ -10,11 +10,12 @@ import (
)
// OpenDB opens BoltDB instance for write-cache. Opens in read-only mode if ro is true.
func OpenDB(p string, ro bool, openFile func(string, int, fs.FileMode) (*os.File, error)) (*bbolt.DB, error) {
func OpenDB(p string, ro bool, openFile func(string, int, fs.FileMode) (*os.File, error), pageSize int) (*bbolt.DB, error) {
return bbolt.Open(filepath.Join(p, dbName), os.ModePerm, &bbolt.Options{
NoFreelistSync: true,
ReadOnly: ro,
Timeout: 100 * time.Millisecond,
OpenFile: openFile,
PageSize: pageSize,
})
}

View file

@ -148,6 +148,10 @@ func New(ctx context.Context, key *keys.PrivateKey, opts ...Option) (*Client, er
} else {
cli.logger.Info(logs.FrostFSIRCreatedRPCClientForEndpoint,
zap.String("endpoint", endpoint.Address))
if cli.endpoints.curr > 0 && cli.cfg.switchInterval != 0 {
cli.switchIsActive.Store(true)
go cli.switchToMostPrioritized(ctx)
}
break
}
}

View file

@ -1,53 +1,13 @@
package container
import (
"crypto/sha256"
"fmt"
v2refs "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/refs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
)
// AnnounceLoadPrm groups parameters of AnnounceLoad operation.
type AnnounceLoadPrm struct {
a container.SizeEstimation
key []byte
client.InvokePrmOptional
}
// SetAnnouncement sets announcement.
func (a2 *AnnounceLoadPrm) SetAnnouncement(a container.SizeEstimation) {
a2.a = a
}
// SetReporter sets public key of the reporter.
func (a2 *AnnounceLoadPrm) SetReporter(key []byte) {
a2.key = key
}
// AnnounceLoad saves container size estimation calculated by storage node
// with key in FrostFS system through Container contract call.
//
// Returns any error encountered that caused the saving to interrupt.
func (c *Client) AnnounceLoad(p AnnounceLoadPrm) error {
binCnr := make([]byte, sha256.Size)
p.a.Container().Encode(binCnr)
prm := client.InvokePrm{}
prm.SetMethod(putSizeMethod)
prm.SetArgs(p.a.Epoch(), binCnr, p.a.Value(), p.key)
prm.InvokePrmOptional = p.InvokePrmOptional
_, err := c.client.Invoke(prm)
if err != nil {
return fmt.Errorf("could not invoke method (%s): %w", putSizeMethod, err)
}
return nil
}
// EstimationID is an identity of container load estimation inside Container contract.
type EstimationID []byte

View file

@ -239,15 +239,6 @@ func (x *multiClient) ObjectPutSingle(ctx context.Context, p client.PrmObjectPut
return
}
func (x *multiClient) ContainerAnnounceUsedSpace(ctx context.Context, prm client.PrmAnnounceSpace) (res *client.ResAnnounceSpace, err error) {
err = x.iterateClients(ctx, func(c clientcore.Client) error {
res, err = c.ContainerAnnounceUsedSpace(ctx, prm)
return err
})
return
}
func (x *multiClient) ObjectDelete(ctx context.Context, p client.PrmObjectDelete) (res *client.ResObjectDelete, err error) {
err = x.iterateClients(ctx, func(c clientcore.Client) error {
res, err = c.ObjectDelete(ctx, p)

View file

@ -81,21 +81,6 @@ func (s *Server) List(ctx context.Context, req *containerGRPC.ListRequest) (*con
return resp.ToGRPCMessage().(*containerGRPC.ListResponse), nil
}
// SetExtendedACL converts gRPC SetExtendedACLRequest message and passes it to internal Container service.
func (s *Server) SetExtendedACL(ctx context.Context, req *containerGRPC.SetExtendedACLRequest) (*containerGRPC.SetExtendedACLResponse, error) {
setEACLReq := new(container.SetExtendedACLRequest)
if err := setEACLReq.FromGRPCMessage(req); err != nil {
return nil, err
}
resp, err := s.srv.SetExtendedACL(ctx, setEACLReq)
if err != nil {
return nil, err
}
return resp.ToGRPCMessage().(*containerGRPC.SetExtendedACLResponse), nil
}
// GetExtendedACL converts gRPC GetExtendedACLRequest message and passes it to internal Container service.
func (s *Server) GetExtendedACL(ctx context.Context, req *containerGRPC.GetExtendedACLRequest) (*containerGRPC.GetExtendedACLResponse, error) {
getEACLReq := new(container.GetExtendedACLRequest)
@ -110,18 +95,3 @@ func (s *Server) GetExtendedACL(ctx context.Context, req *containerGRPC.GetExten
return resp.ToGRPCMessage().(*containerGRPC.GetExtendedACLResponse), nil
}
// AnnounceUsedSpace converts gRPC AnnounceUsedSpaceRequest message and passes it to internal Container service.
func (s *Server) AnnounceUsedSpace(ctx context.Context, req *containerGRPC.AnnounceUsedSpaceRequest) (*containerGRPC.AnnounceUsedSpaceResponse, error) {
announceReq := new(container.AnnounceUsedSpaceRequest)
if err := announceReq.FromGRPCMessage(req); err != nil {
return nil, err
}
resp, err := s.srv.AnnounceUsedSpace(ctx, announceReq)
if err != nil {
return nil, err
}
return resp.ToGRPCMessage().(*containerGRPC.AnnounceUsedSpaceResponse), nil
}

View file

@ -78,15 +78,6 @@ func NewAPEServer(router policyengine.ChainRouter, reader containers, ir ir, nm
}
}
func (ac *apeChecker) AnnounceUsedSpace(ctx context.Context, req *container.AnnounceUsedSpaceRequest) (*container.AnnounceUsedSpaceResponse, error) {
ctx, span := tracing.StartSpanFromContext(ctx, "apeChecker.AnnounceUsedSpace")
defer span.End()
// this method is not used, so not checked
return ac.next.AnnounceUsedSpace(ctx, req)
}
func (ac *apeChecker) Delete(ctx context.Context, req *container.DeleteRequest) (*container.DeleteResponse, error) {
ctx, span := tracing.StartSpanFromContext(ctx, "apeChecker.Delete")
defer span.End()
@ -220,7 +211,7 @@ func (ac *apeChecker) Put(ctx context.Context, req *container.PutRequest) (*cont
}
}
namespace, err := ac.namespaceByOwner(req.GetBody().GetContainer().GetOwnerID())
namespace, err := ac.namespaceByKnownOwner(req.GetBody().GetContainer().GetOwnerID())
if err != nil {
return nil, fmt.Errorf("get namespace error: %w", err)
}
@ -303,18 +294,6 @@ func (ac *apeChecker) getRoleWithoutContainerID(oID *refs.OwnerID, mh *session.R
return nativeschema.PropertyValueContainerRoleOthers, pk, nil
}
func (ac *apeChecker) SetExtendedACL(ctx context.Context, req *container.SetExtendedACLRequest) (*container.SetExtendedACLResponse, error) {
ctx, span := tracing.StartSpanFromContext(ctx, "apeChecker.SetExtendedACL")
defer span.End()
if err := ac.validateContainerBoundedOperation(ctx, req.GetBody().GetEACL().GetContainerID(), req.GetMetaHeader(), req.GetVerificationHeader(),
nativeschema.MethodSetContainerEACL); err != nil {
return nil, err
}
return ac.next.SetExtendedACL(ctx, req)
}
func (ac *apeChecker) validateContainerBoundedOperation(ctx context.Context, containerID *refs.ContainerID, mh *session.RequestMetaHeader, vh *session.RequestVerificationHeader, op string) error {
if vh == nil {
return errMissingVerificationHeader
@ -629,6 +608,25 @@ func (ac *apeChecker) namespaceByOwner(owner *refs.OwnerID) (string, error) {
return namespace, nil
}
func (ac *apeChecker) namespaceByKnownOwner(owner *refs.OwnerID) (string, error) {
var ownerSDK user.ID
if owner == nil {
return "", errOwnerIDIsNotSet
}
if err := ownerSDK.ReadFromV2(*owner); err != nil {
return "", err
}
addr, err := ownerSDK.ScriptHash()
if err != nil {
return "", err
}
subject, err := ac.frostFSIDClient.GetSubject(addr)
if err != nil {
return "", fmt.Errorf("get subject error: %w", err)
}
return subject.Namespace, nil
}
// validateNamespace validates a namespace set in a container.
// If frostfs-id contract stores a namespace N1 for an owner ID and a container within a request
// is set with namespace N2 (via Zone() property), then N2 is invalid and the request is denied.

View file

@ -9,7 +9,6 @@ import (
"net"
"testing"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/acl"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/container"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/refs"
session "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/session"
@ -50,7 +49,6 @@ func TestAPE(t *testing.T) {
t.Run("deny get container by user claim tag", testDenyGetContainerByUserClaimTag)
t.Run("deny get container by IP", testDenyGetContainerByIP)
t.Run("deny get container by group id", testDenyGetContainerByGroupID)
t.Run("deny set container eACL for IR", testDenySetContainerEACLForIR)
t.Run("deny get container eACL for IR with session token", testDenyGetContainerEACLForIRSessionToken)
t.Run("deny put container for others with session token", testDenyPutContainerForOthersSessionToken)
t.Run("deny put container, read namespace from frostfsID", testDenyPutContainerReadNamespaceFromFrostfsID)
@ -665,84 +663,6 @@ func testDenyGetContainerByGroupID(t *testing.T) {
require.ErrorAs(t, err, &errAccessDenied)
}
func testDenySetContainerEACLForIR(t *testing.T) {
t.Parallel()
srv := &srvStub{
calls: map[string]int{},
}
router := inmemory.NewInMemory()
contRdr := &containerStub{
c: map[cid.ID]*containercore.Container{},
}
ir := &irStub{
keys: [][]byte{},
}
nm := &netmapStub{}
frostfsIDSubjectReader := &frostfsidStub{
subjects: map[util.Uint160]*client.Subject{},
}
apeSrv := NewAPEServer(router, contRdr, ir, nm, frostfsIDSubjectReader, srv)
contID := cidtest.ID()
testContainer := containertest.Container()
pp := netmap.PlacementPolicy{}
require.NoError(t, pp.DecodeString("REP 1"))
testContainer.SetPlacementPolicy(pp)
contRdr.c[contID] = &containercore.Container{Value: testContainer}
nm.currentEpoch = 100
nm.netmaps = map[uint64]*netmap.NetMap{}
var testNetmap netmap.NetMap
testNetmap.SetEpoch(nm.currentEpoch)
testNetmap.SetNodes([]netmap.NodeInfo{{}})
nm.netmaps[nm.currentEpoch] = &testNetmap
nm.netmaps[nm.currentEpoch-1] = &testNetmap
_, _, err := router.MorphRuleChainStorage().AddMorphRuleChain(chain.Ingress, engine.ContainerTarget(contID.EncodeToString()), &chain.Chain{
Rules: []chain.Rule{
{
Status: chain.AccessDenied,
Actions: chain.Actions{
Names: []string{
nativeschema.MethodSetContainerEACL,
},
},
Resources: chain.Resources{
Names: []string{
fmt.Sprintf(nativeschema.ResourceFormatRootContainer, contID.EncodeToString()),
},
},
Condition: []chain.Condition{
{
Kind: chain.KindRequest,
Key: nativeschema.PropertyKeyActorRole,
Value: nativeschema.PropertyValueContainerRoleIR,
Op: chain.CondStringEquals,
},
},
},
},
})
require.NoError(t, err)
req := &container.SetExtendedACLRequest{}
req.SetBody(&container.SetExtendedACLRequestBody{})
var refContID refs.ContainerID
contID.WriteToV2(&refContID)
req.GetBody().SetEACL(&acl.Table{})
req.GetBody().GetEACL().SetContainerID(&refContID)
pk, err := keys.NewPrivateKey()
require.NoError(t, err)
require.NoError(t, signature.SignServiceMessage(&pk.PrivateKey, req))
ir.keys = append(ir.keys, pk.PublicKey().Bytes())
resp, err := apeSrv.SetExtendedACL(context.Background(), req)
require.Nil(t, resp)
var errAccessDenied *apistatus.ObjectAccessDenied
require.ErrorAs(t, err, &errAccessDenied)
}
func testDenyGetContainerEACLForIRSessionToken(t *testing.T) {
t.Parallel()
srv := &srvStub{
@ -845,17 +765,22 @@ func testDenyPutContainerForOthersSessionToken(t *testing.T) {
keys: [][]byte{},
}
nm := &netmapStub{}
frostfsIDSubjectReader := &frostfsidStub{
subjects: map[util.Uint160]*client.Subject{},
}
apeSrv := NewAPEServer(router, contRdr, ir, nm, frostfsIDSubjectReader, srv)
testContainer := containertest.Container()
owner := testContainer.Owner()
ownerAddr, err := owner.ScriptHash()
require.NoError(t, err)
frostfsIDSubjectReader := &frostfsidStub{
subjects: map[util.Uint160]*client.Subject{
ownerAddr: {},
},
}
apeSrv := NewAPEServer(router, contRdr, ir, nm, frostfsIDSubjectReader, srv)
nm.currentEpoch = 100
nm.netmaps = map[uint64]*netmap.NetMap{}
_, _, err := router.MorphRuleChainStorage().AddMorphRuleChain(chain.Ingress, engine.NamespaceTarget(""), &chain.Chain{
_, _, err = router.MorphRuleChainStorage().AddMorphRuleChain(chain.Ingress, engine.NamespaceTarget(""), &chain.Chain{
Rules: []chain.Rule{
{
Status: chain.AccessDenied,
@ -1229,11 +1154,6 @@ type srvStub struct {
calls map[string]int
}
func (s *srvStub) AnnounceUsedSpace(context.Context, *container.AnnounceUsedSpaceRequest) (*container.AnnounceUsedSpaceResponse, error) {
s.calls["AnnounceUsedSpace"]++
return &container.AnnounceUsedSpaceResponse{}, nil
}
func (s *srvStub) Delete(context.Context, *container.DeleteRequest) (*container.DeleteResponse, error) {
s.calls["Delete"]++
return &container.DeleteResponse{}, nil
@ -1259,11 +1179,6 @@ func (s *srvStub) Put(context.Context, *container.PutRequest) (*container.PutRes
return &container.PutResponse{}, nil
}
func (s *srvStub) SetExtendedACL(context.Context, *container.SetExtendedACLRequest) (*container.SetExtendedACLResponse, error) {
s.calls["SetExtendedACL"]++
return &container.SetExtendedACLResponse{}, nil
}
type irStub struct {
keys [][]byte
}

View file

@ -6,7 +6,6 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/container"
container_grpc "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/container/grpc"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/refs"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/audit"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
@ -29,24 +28,6 @@ func NewAuditService(next Server, log *logger.Logger, enabled *atomic.Bool) Serv
}
}
// AnnounceUsedSpace implements Server.
func (a *auditService) AnnounceUsedSpace(ctx context.Context, req *container.AnnounceUsedSpaceRequest) (*container.AnnounceUsedSpaceResponse, error) {
res, err := a.next.AnnounceUsedSpace(ctx, req)
if !a.enabled.Load() {
return res, err
}
var ids []*refs.ContainerID
for _, v := range req.GetBody().GetAnnouncements() {
ids = append(ids, v.GetContainerID())
}
audit.LogRequest(a.log, container_grpc.ContainerService_AnnounceUsedSpace_FullMethodName, req,
audit.TargetFromRefs(ids, &cid.ID{}), err == nil)
return res, err
}
// Delete implements Server.
func (a *auditService) Delete(ctx context.Context, req *container.DeleteRequest) (*container.DeleteResponse, error) {
res, err := a.next.Delete(ctx, req)
@ -103,14 +84,3 @@ func (a *auditService) Put(ctx context.Context, req *container.PutRequest) (*con
audit.TargetFromRef(res.GetBody().GetContainerID(), &cid.ID{}), err == nil)
return res, err
}
// SetExtendedACL implements Server.
func (a *auditService) SetExtendedACL(ctx context.Context, req *container.SetExtendedACLRequest) (*container.SetExtendedACLResponse, error) {
res, err := a.next.SetExtendedACL(ctx, req)
if !a.enabled.Load() {
return res, err
}
audit.LogRequest(a.log, container_grpc.ContainerService_SetExtendedACL_FullMethodName, req,
audit.TargetFromRef(req.GetBody().GetEACL().GetContainerID(), &cid.ID{}), err == nil)
return res, err
}

View file

@ -14,7 +14,6 @@ type ServiceExecutor interface {
Delete(context.Context, *session.Token, *container.DeleteRequestBody) (*container.DeleteResponseBody, error)
Get(context.Context, *container.GetRequestBody) (*container.GetResponseBody, error)
List(context.Context, *container.ListRequestBody) (*container.ListResponseBody, error)
SetExtendedACL(context.Context, *session.Token, *container.SetExtendedACLRequestBody) (*container.SetExtendedACLResponseBody, error)
GetExtendedACL(context.Context, *container.GetExtendedACLRequestBody) (*container.GetExtendedACLResponseBody, error)
}
@ -96,24 +95,6 @@ func (s *executorSvc) List(ctx context.Context, req *container.ListRequest) (*co
return resp, nil
}
func (s *executorSvc) SetExtendedACL(ctx context.Context, req *container.SetExtendedACLRequest) (*container.SetExtendedACLResponse, error) {
meta := req.GetMetaHeader()
for origin := meta.GetOrigin(); origin != nil; origin = meta.GetOrigin() {
meta = origin
}
respBody, err := s.exec.SetExtendedACL(ctx, meta.GetSessionToken(), req.GetBody())
if err != nil {
return nil, fmt.Errorf("could not execute SetEACL request: %w", err)
}
resp := new(container.SetExtendedACLResponse)
resp.SetBody(respBody)
s.respSvc.SetMeta(resp)
return resp, nil
}
func (s *executorSvc) GetExtendedACL(ctx context.Context, req *container.GetExtendedACLRequest) (*container.GetExtendedACLResponse, error) {
respBody, err := s.exec.GetExtendedACL(ctx, req.GetBody())
if err != nil {

View file

@ -13,8 +13,6 @@ import (
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/session"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
var errMissingUserID = errors.New("missing user ID")
@ -204,10 +202,6 @@ func (s *morphExecutor) List(_ context.Context, body *container.ListRequestBody)
return res, nil
}
func (s *morphExecutor) SetExtendedACL(_ context.Context, _ *sessionV2.Token, _ *container.SetExtendedACLRequestBody) (*container.SetExtendedACLResponseBody, error) {
return nil, status.Errorf(codes.Unimplemented, "method SetExtendedACL not implemented")
}
func (s *morphExecutor) GetExtendedACL(_ context.Context, body *container.GetExtendedACLRequestBody) (*container.GetExtendedACLResponseBody, error) {
idV2 := body.GetContainerID()
if idV2 == nil {

View file

@ -12,7 +12,5 @@ type Server interface {
Get(context.Context, *container.GetRequest) (*container.GetResponse, error)
Delete(context.Context, *container.DeleteRequest) (*container.DeleteResponse, error)
List(context.Context, *container.ListRequest) (*container.ListResponse, error)
SetExtendedACL(context.Context, *container.SetExtendedACLRequest) (*container.SetExtendedACLResponse, error)
GetExtendedACL(context.Context, *container.GetExtendedACLRequest) (*container.GetExtendedACLResponse, error)
AnnounceUsedSpace(context.Context, *container.AnnounceUsedSpaceRequest) (*container.AnnounceUsedSpaceResponse, error)
}

View file

@ -57,15 +57,6 @@ func (s *signService) List(ctx context.Context, req *container.ListRequest) (*co
return resp, s.sigSvc.SignResponse(resp, err)
}
func (s *signService) SetExtendedACL(ctx context.Context, req *container.SetExtendedACLRequest) (*container.SetExtendedACLResponse, error) {
if err := s.sigSvc.VerifyRequest(req); err != nil {
resp := new(container.SetExtendedACLResponse)
return resp, s.sigSvc.SignResponse(resp, err)
}
resp, err := util.EnsureNonNilResponse(s.svc.SetExtendedACL(ctx, req))
return resp, s.sigSvc.SignResponse(resp, err)
}
func (s *signService) GetExtendedACL(ctx context.Context, req *container.GetExtendedACLRequest) (*container.GetExtendedACLResponse, error) {
if err := s.sigSvc.VerifyRequest(req); err != nil {
resp := new(container.GetExtendedACLResponse)
@ -74,12 +65,3 @@ func (s *signService) GetExtendedACL(ctx context.Context, req *container.GetExte
resp, err := util.EnsureNonNilResponse(s.svc.GetExtendedACL(ctx, req))
return resp, s.sigSvc.SignResponse(resp, err)
}
func (s *signService) AnnounceUsedSpace(ctx context.Context, req *container.AnnounceUsedSpaceRequest) (*container.AnnounceUsedSpaceResponse, error) {
if err := s.sigSvc.VerifyRequest(req); err != nil {
resp := new(container.AnnounceUsedSpaceResponse)
return resp, s.sigSvc.SignResponse(resp, err)
}
resp, err := util.EnsureNonNilResponse(s.svc.AnnounceUsedSpace(ctx, req))
return resp, s.sigSvc.SignResponse(resp, err)
}

Binary file not shown.

Binary file not shown.

View file

@ -26,4 +26,7 @@ enum HealthStatus {
// IR application is shutting down.
SHUTTING_DOWN = 3;
// IR application is reconfiguring.
RECONFIGURING = 4;
}

View file

@ -12,7 +12,7 @@ import (
)
func (r *request) assemble(ctx context.Context) {
if !r.canAssemble() {
if !r.canAssembleComplexObject() {
r.log.Debug(logs.GetCanNotAssembleTheObject)
return
}
@ -38,7 +38,7 @@ func (r *request) assemble(ctx context.Context) {
r.log.Debug(logs.GetTryingToAssembleTheObject)
r.prm.common = r.prm.common.WithLocalOnly(false)
assembler := newAssembler(r.address(), r.splitInfo(), r.ctxRange(), r)
assembler := newAssembler(r.address(), r.splitInfo(), r.ctxRange(), r, r.headOnly())
r.log.Debug(logs.GetAssemblingSplittedObject,
zap.Uint64("range_offset", r.ctxRange().GetOffset()),

View file

@ -11,7 +11,7 @@ import (
)
func (r *request) assembleEC(ctx context.Context) {
if r.isRaw() && r.isLocal() {
if r.isRaw() {
r.log.Debug(logs.GetCanNotAssembleTheObject)
return
}
@ -43,7 +43,7 @@ func (r *request) assembleEC(ctx context.Context) {
}
r.prm.common = r.prm.common.WithLocalOnly(false)
assembler := newAssemblerEC(r.address(), r.infoEC, r.ctxRange(), r, r.localStorage, r.containerSource, r.log, r.headOnly(), r.isRaw(), r.traverserGenerator, r.curProcEpoch)
assembler := newAssemblerEC(r.address(), r.infoEC, r.ctxRange(), r, r.localStorage, r.containerSource, r.log, r.headOnly(), r.traverserGenerator, r.curProcEpoch)
r.log.Debug(logs.GetAssemblingECObject,
zap.Uint64("range_offset", r.ctxRange().GetOffset()),

View file

@ -19,6 +19,7 @@ type assembler struct {
splitInfo *objectSDK.SplitInfo
rng *objectSDK.Range
objGetter objectGetter
head bool
currentOffset uint64
@ -30,18 +31,23 @@ func newAssembler(
splitInfo *objectSDK.SplitInfo,
rng *objectSDK.Range,
objGetter objectGetter,
head bool,
) *assembler {
return &assembler{
addr: addr,
rng: rng,
splitInfo: splitInfo,
objGetter: objGetter,
head: head,
}
}
// Assemble assembles splitted large object and writes it's content to ObjectWriter.
// It returns parent object.
func (a *assembler) Assemble(ctx context.Context, writer ObjectWriter) (*objectSDK.Object, error) {
if a.head {
return a.assembleHeader(ctx, writer)
}
sourceObjectID, ok := a.getLastPartOrLinkObjectID()
if !ok {
return nil, objectSDK.NewSplitInfoError(a.splitInfo)
@ -65,6 +71,43 @@ func (a *assembler) Assemble(ctx context.Context, writer ObjectWriter) (*objectS
return a.parentObject, nil
}
func (a *assembler) assembleHeader(ctx context.Context, writer ObjectWriter) (*objectSDK.Object, error) {
var sourceObjectIDs []oid.ID
sourceObjectID, ok := a.splitInfo.Link()
if ok {
sourceObjectIDs = append(sourceObjectIDs, sourceObjectID)
}
sourceObjectID, ok = a.splitInfo.LastPart()
if ok {
sourceObjectIDs = append(sourceObjectIDs, sourceObjectID)
}
if len(sourceObjectIDs) == 0 {
return nil, objectSDK.NewSplitInfoError(a.splitInfo)
}
for _, sourceObjectID = range sourceObjectIDs {
obj, err := a.getParent(ctx, sourceObjectID, writer)
if err == nil {
return obj, nil
}
}
return nil, objectSDK.NewSplitInfoError(a.splitInfo)
}
func (a *assembler) getParent(ctx context.Context, sourceObjectID oid.ID, writer ObjectWriter) (*objectSDK.Object, error) {
obj, err := a.objGetter.HeadObject(ctx, sourceObjectID)
if err != nil {
return nil, err
}
parent := obj.Parent()
if parent == nil {
return nil, objectSDK.NewSplitInfoError(a.splitInfo)
}
if err := writer.WriteHeader(ctx, parent); err != nil {
return nil, err
}
return obj, nil
}
func (a *assembler) getLastPartOrLinkObjectID() (oid.ID, bool) {
sourceObjectID, ok := a.splitInfo.Link()
if ok {

View file

@ -37,7 +37,6 @@ type assemblerec struct {
cs container.Source
log *logger.Logger
head bool
raw bool
traverserGenerator traverserGenerator
epoch uint64
}
@ -51,7 +50,6 @@ func newAssemblerEC(
cs container.Source,
log *logger.Logger,
head bool,
raw bool,
tg traverserGenerator,
epoch uint64,
) *assemblerec {
@ -64,7 +62,6 @@ func newAssemblerEC(
cs: cs,
log: log,
head: head,
raw: raw,
traverserGenerator: tg,
epoch: epoch,
}
@ -74,9 +71,6 @@ func newAssemblerEC(
// It returns parent object.
func (a *assemblerec) Assemble(ctx context.Context, writer ObjectWriter) (*objectSDK.Object, error) {
switch {
case a.raw:
err := a.reconstructRawError(ctx)
return nil, err
case a.head:
return a.reconstructHeader(ctx, writer)
case a.rng != nil:
@ -149,56 +143,6 @@ func (a *assemblerec) reconstructObjectFromParts(ctx context.Context, headers bo
return c.Reconstruct(parts)
}
func (a *assemblerec) reconstructRawError(ctx context.Context) error {
chunks := make(map[string]objectSDK.ECChunk)
var chunksGuard sync.Mutex
for _, ch := range a.ecInfo.localChunks {
chunks[string(ch.ID.GetValue())] = ch
}
objID := a.addr.Object()
trav, _, err := a.traverserGenerator.GenerateTraverser(a.addr.Container(), &objID, a.epoch)
if err != nil {
return err
}
eg, ctx := errgroup.WithContext(ctx)
for {
batch := trav.Next()
if len(batch) == 0 {
break
}
for _, node := range batch {
var info client.NodeInfo
client.NodeInfoFromNetmapElement(&info, node)
eg.Go(func() error {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
if _, found := a.ecInfo.remoteChunks[string(info.PublicKey())]; found {
return nil
}
nodeChunks := a.tryGetChunkListFromNode(ctx, info)
chunksGuard.Lock()
defer chunksGuard.Unlock()
for _, ch := range nodeChunks {
chunks[string(ch.ID.GetValue())] = ch
}
return nil
})
}
}
if err = eg.Wait(); err != nil {
return err
}
return createECInfoErr(chunks)
}
func (a *assemblerec) retrieveParts(ctx context.Context, trav *placement.Traverser, cnr *container.Container) []*objectSDK.Object {
dataCount := policy.ECDataCount(cnr.Value.PlacementPolicy())
parityCount := policy.ECParityCount(cnr.Value.PlacementPolicy())
@ -293,7 +237,7 @@ func (a *assemblerec) tryGetChunkFromLocalStorage(ctx context.Context, ch object
return nil
}
var addr oid.Address
addr.SetContainer(addr.Container())
addr.SetContainer(a.addr.Container())
addr.SetObject(objID)
var object *objectSDK.Object
if a.head {
@ -359,11 +303,3 @@ func (a *assemblerec) tryGetChunkFromRemoteStorage(ctx context.Context, node cli
}
return object
}
func createECInfoErr(chunks map[string]objectSDK.ECChunk) *objectSDK.ECInfoError {
info := objectSDK.NewECInfo()
for _, ch := range chunks {
info.AddChunk(ch)
}
return objectSDK.NewECInfoError(info)
}

View file

@ -111,6 +111,10 @@ func (exec *request) analyzeStatus(ctx context.Context, execCnr bool) {
exec.log.Debug(logs.GetRequestedRangeIsOutOfObjectBounds)
case statusEC:
exec.log.Debug(logs.GetRequestedObjectIsEC)
if exec.isRaw() && execCnr {
exec.executeOnContainer(ctx)
exec.analyzeStatus(ctx, false)
}
exec.assembleEC(ctx)
default:
exec.log.Debug(logs.OperationFinishedWithError,

View file

@ -730,7 +730,7 @@ func TestGetRemoteSmall(t *testing.T) {
t.Run("VIRTUAL", func(t *testing.T) {
testHeadVirtual := func(svc *Service, addr oid.Address, i *objectSDK.SplitInfo) {
headPrm := newHeadPrm(false, nil)
headPrm := newHeadPrm(true, nil)
headPrm.WithAddress(addr)
errSplit := objectSDK.NewSplitInfoError(objectSDK.NewSplitInfo())

View file

@ -35,8 +35,12 @@ func (r *request) processNode(ctx context.Context, info client.NodeInfo) bool {
switch {
default:
r.log.Debug(logs.GetRemoteCallFailed, zap.Error(err))
if r.status != statusEC {
// for raw requests, continue to collect other parts
r.status = statusUndefined
r.err = new(apistatus.ObjectNotFound)
}
return false
case err == nil:
r.status = statusOK
r.err = nil
@ -48,22 +52,28 @@ func (r *request) processNode(ctx context.Context, info client.NodeInfo) bool {
r.collectedObject = obj
r.writeCollectedObject(ctx)
}
return true
case errors.As(err, &errRemoved):
r.status = statusINHUMED
r.err = errRemoved
return true
case errors.As(err, &errOutOfRange):
r.status = statusOutOfRange
r.err = errOutOfRange
return true
case errors.As(err, &errSplitInfo):
r.status = statusVIRTUAL
mergeSplitInfo(r.splitInfo(), errSplitInfo.SplitInfo())
r.err = objectSDK.NewSplitInfoError(r.infoSplit)
return true
case errors.As(err, &errECInfo):
r.status = statusEC
r.err = r.infoEC.addRemote(string(info.PublicKey()), errECInfo.ECInfo())
if r.isRaw() {
return false // continue to collect all parts
}
return true
}
return r.status != statusUndefined
}
func (r *request) getRemote(ctx context.Context, rs remoteStorage, info client.NodeInfo) (*objectSDK.Object, error) {

View file

@ -88,8 +88,8 @@ func (r *request) key() (*ecdsa.PrivateKey, error) {
return r.keyStore.GetKey(sessionInfo)
}
func (r *request) canAssemble() bool {
return !r.isRaw() && !r.headOnly()
func (r *request) canAssembleComplexObject() bool {
return !r.isRaw()
}
func (r *request) splitInfo() *objectSDK.SplitInfo {

View file

@ -17,6 +17,7 @@ import (
containerSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/erasurecode"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/transformer"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
@ -39,7 +40,7 @@ type ecWriter struct {
}
func (e *ecWriter) WriteObject(ctx context.Context, obj *objectSDK.Object) error {
relayed, err := e.relayIfNotContainerNode(ctx)
relayed, err := e.relayIfNotContainerNode(ctx, obj)
if err != nil {
return err
}
@ -65,7 +66,7 @@ func (e *ecWriter) WriteObject(ctx context.Context, obj *objectSDK.Object) error
return e.writeRawObject(ctx, obj)
}
func (e *ecWriter) relayIfNotContainerNode(ctx context.Context) (bool, error) {
func (e *ecWriter) relayIfNotContainerNode(ctx context.Context, obj *objectSDK.Object) (bool, error) {
if e.relay == nil {
return false, nil
}
@ -77,7 +78,13 @@ func (e *ecWriter) relayIfNotContainerNode(ctx context.Context) (bool, error) {
// object can be splitted or saved local
return false, nil
}
if err := e.relayToContainerNode(ctx); err != nil {
objID := object.AddressOf(obj).Object()
var index uint32
if obj.ECHeader() != nil {
objID = obj.ECHeader().Parent()
index = obj.ECHeader().Index()
}
if err := e.relayToContainerNode(ctx, objID, index); err != nil {
return false, err
}
return true, nil
@ -102,18 +109,20 @@ func (e *ecWriter) currentNodeIsContainerNode() (bool, error) {
return false, nil
}
func (e *ecWriter) relayToContainerNode(ctx context.Context) error {
t, err := placement.NewTraverser(e.placementOpts...)
func (e *ecWriter) relayToContainerNode(ctx context.Context, objID oid.ID, index uint32) error {
t, err := placement.NewTraverser(append(e.placementOpts, placement.ForObject(objID))...)
if err != nil {
return err
}
var lastErr error
offset := int(index)
for {
nodes := t.Next()
if len(nodes) == 0 {
break
}
for _, node := range nodes {
for idx := range nodes {
node := nodes[(idx+offset)%len(nodes)]
var info client.NodeInfo
client.NodeInfoFromNetmapElement(&info, node)
@ -149,6 +158,10 @@ func (e *ecWriter) relayToContainerNode(ctx context.Context) error {
}
func (e *ecWriter) writeECPart(ctx context.Context, obj *objectSDK.Object) error {
if e.commonPrm.LocalOnly() {
return e.writePartLocal(ctx, obj)
}
t, err := placement.NewTraverser(append(e.placementOpts, placement.ForObject(obj.ECHeader().Parent()))...)
if err != nil {
return err

View file

@ -19,6 +19,7 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/policy"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/internal"
svcutil "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
@ -229,6 +230,9 @@ func (s *Service) getPutSinglePlacementOptions(obj *objectSDK.Object, copiesNumb
if len(copiesNumber) > 0 && !result.isEC {
result.placementOptions = append(result.placementOptions, placement.WithCopyNumbers(copiesNumber))
}
if container.IsECContainer(cnrInfo.Value) && !object.IsECSupported(obj) && !localOnly {
result.placementOptions = append(result.placementOptions, placement.SuccessAfter(uint32(policy.ECParityCount(cnrInfo.Value.PlacementPolicy())+1)))
}
result.placementOptions = append(result.placementOptions, placement.ForContainer(cnrInfo.Value))
objID, ok := obj.ID()

View file

@ -10,6 +10,7 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/policy"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement"
pkgutil "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util"
@ -212,10 +213,10 @@ func (p *Streamer) newObjectWriter(prm *PutInitPrm) transformer.ObjectWriter {
if container.IsECContainer(prm.cnr) && object.IsECSupported(prm.hdr) {
return p.newECWriter(prm)
}
return p.newDefaultObjectWriter(prm)
return p.newDefaultObjectWriter(prm, false)
}
func (p *Streamer) newDefaultObjectWriter(prm *PutInitPrm) transformer.ObjectWriter {
func (p *Streamer) newDefaultObjectWriter(prm *PutInitPrm, forECPlacement bool) transformer.ObjectWriter {
var relay func(context.Context, nodeDesc) error
if p.relay != nil {
relay = func(ctx context.Context, node nodeDesc) error {
@ -232,9 +233,16 @@ func (p *Streamer) newDefaultObjectWriter(prm *PutInitPrm) transformer.ObjectWri
}
}
traverseOpts := prm.traverseOpts
if forECPlacement && !prm.common.LocalOnly() {
// save non-regular and linking object to EC container.
// EC 2.1 -> REP 2, EC 2.2 -> REP 3 etc.
traverseOpts = append(traverseOpts, placement.SuccessAfter(uint32(policy.ECParityCount(prm.cnr.PlacementPolicy())+1)))
}
return &distributedTarget{
cfg: p.cfg,
placementOpts: prm.traverseOpts,
placementOpts: traverseOpts,
nodeTargetInitializer: func(node nodeDesc) preparedObjectTarget {
if node.local {
return localTarget{
@ -266,7 +274,7 @@ func (p *Streamer) newECWriter(prm *PutInitPrm) transformer.ObjectWriter {
commonPrm: prm.common,
relay: p.relay,
},
repWriter: p.newDefaultObjectWriter(prm),
repWriter: p.newDefaultObjectWriter(prm, true),
}
}

Binary file not shown.