Compare commits
14 commits
master
...
fix/ec_put
Author | SHA1 | Date | |
---|---|---|---|
fad4692556 | |||
54862250b2 | |||
89d670c2f0 | |||
87c5954f4e | |||
28fc41bd98 | |||
20f308876c | |||
02d191e9a6 | |||
cecb3f7867 | |||
f53d30fa95 | |||
1b92817bd3 | |||
32ec421ac7 | |||
4d102b05e5 | |||
308da7cb01 | |||
37b83c0856 |
42 changed files with 221 additions and 550 deletions
|
@ -38,6 +38,12 @@ var (
|
|||
func parseTarget(cmd *cobra.Command) policyengine.Target {
|
||||
name, _ := cmd.Flags().GetString(targetNameFlag)
|
||||
typ, err := parseTargetType(cmd)
|
||||
|
||||
// interpret "root" namespace as empty
|
||||
if typ == policyengine.Namespace && name == "root" {
|
||||
name = ""
|
||||
}
|
||||
|
||||
commonCmd.ExitOnErr(cmd, "read target type error: %w", err)
|
||||
|
||||
return policyengine.Target{
|
||||
|
|
|
@ -214,29 +214,6 @@ func EACL(ctx context.Context, prm EACLPrm) (res EACLRes, err error) {
|
|||
return
|
||||
}
|
||||
|
||||
// SetEACLPrm groups parameters of SetEACL operation.
|
||||
type SetEACLPrm struct {
|
||||
Client *client.Client
|
||||
ClientParams client.PrmContainerSetEACL
|
||||
}
|
||||
|
||||
// SetEACLRes groups the resulting values of SetEACL operation.
|
||||
type SetEACLRes struct{}
|
||||
|
||||
// SetEACL requests to save an eACL table in FrostFS.
|
||||
//
|
||||
// Operation is asynchronous and no guaranteed even in the absence of errors.
|
||||
// The required time is also not predictable.
|
||||
//
|
||||
// Success can be verified by reading by container identifier.
|
||||
//
|
||||
// Returns any error which prevented the operation from completing correctly in error return.
|
||||
func SetEACL(ctx context.Context, prm SetEACLPrm) (res SetEACLRes, err error) {
|
||||
_, err = prm.Client.ContainerSetEACL(ctx, prm.ClientParams)
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
// NetworkInfoPrm groups parameters of NetworkInfo operation.
|
||||
type NetworkInfoPrm struct {
|
||||
Client *client.Client
|
||||
|
|
|
@ -26,7 +26,6 @@ func init() {
|
|||
listContainerObjectsCmd,
|
||||
getContainerInfoCmd,
|
||||
getExtendedACLCmd,
|
||||
setExtendedACLCmd,
|
||||
containerNodesCmd,
|
||||
policyPlaygroundCmd,
|
||||
}
|
||||
|
@ -39,7 +38,6 @@ func init() {
|
|||
initContainerListObjectsCmd()
|
||||
initContainerInfoCmd()
|
||||
initContainerGetEACLCmd()
|
||||
initContainerSetEACLCmd()
|
||||
initContainerNodesCmd()
|
||||
initContainerPolicyPlaygroundCmd()
|
||||
|
||||
|
@ -53,7 +51,6 @@ func init() {
|
|||
}{
|
||||
{createContainerCmd, "PUT"},
|
||||
{deleteContainerCmd, "DELETE"},
|
||||
{setExtendedACLCmd, "SETEACL"},
|
||||
} {
|
||||
commonflags.InitSession(el.cmd, "container "+el.verb)
|
||||
}
|
||||
|
|
|
@ -1,108 +0,0 @@
|
|||
package container
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"errors"
|
||||
"time"
|
||||
|
||||
internalclient "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/client"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/common"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/commonflags"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/key"
|
||||
commonCmd "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/internal/common"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
|
||||
"github.com/spf13/cobra"
|
||||
)
|
||||
|
||||
var flagVarsSetEACL struct {
|
||||
noPreCheck bool
|
||||
|
||||
srcPath string
|
||||
}
|
||||
|
||||
var setExtendedACLCmd = &cobra.Command{
|
||||
Use: "set-eacl",
|
||||
Short: "Set new extended ACL table for container",
|
||||
Long: `Set new extended ACL table for container.
|
||||
Container ID in EACL table will be substituted with ID from the CLI.`,
|
||||
Run: func(cmd *cobra.Command, _ []string) {
|
||||
id := parseContainerID(cmd)
|
||||
eaclTable := common.ReadEACL(cmd, flagVarsSetEACL.srcPath)
|
||||
|
||||
tok := getSession(cmd)
|
||||
|
||||
eaclTable.SetCID(id)
|
||||
|
||||
pk := key.GetOrGenerate(cmd)
|
||||
cli := internalclient.GetSDKClientByFlag(cmd, pk, commonflags.RPC)
|
||||
|
||||
if !flagVarsSetEACL.noPreCheck {
|
||||
cmd.Println("Checking the ability to modify access rights in the container...")
|
||||
|
||||
extendable, err := internalclient.IsACLExtendable(cmd.Context(), cli, id)
|
||||
commonCmd.ExitOnErr(cmd, "Extensibility check failure: %w", err)
|
||||
|
||||
if !extendable {
|
||||
commonCmd.ExitOnErr(cmd, "", errors.New("container ACL is immutable"))
|
||||
}
|
||||
|
||||
cmd.Println("ACL extension is enabled in the container, continue processing.")
|
||||
}
|
||||
|
||||
setEACLPrm := internalclient.SetEACLPrm{
|
||||
Client: cli,
|
||||
ClientParams: client.PrmContainerSetEACL{
|
||||
Table: eaclTable,
|
||||
Session: tok,
|
||||
},
|
||||
}
|
||||
|
||||
_, err := internalclient.SetEACL(cmd.Context(), setEACLPrm)
|
||||
commonCmd.ExitOnErr(cmd, "rpc error: %w", err)
|
||||
|
||||
if containerAwait {
|
||||
exp, err := eaclTable.Marshal()
|
||||
commonCmd.ExitOnErr(cmd, "broken EACL table: %w", err)
|
||||
|
||||
cmd.Println("awaiting...")
|
||||
|
||||
getEACLPrm := internalclient.EACLPrm{
|
||||
Client: cli,
|
||||
ClientParams: client.PrmContainerEACL{
|
||||
ContainerID: &id,
|
||||
},
|
||||
}
|
||||
|
||||
for i := 0; i < awaitTimeout; i++ {
|
||||
time.Sleep(1 * time.Second)
|
||||
|
||||
res, err := internalclient.EACL(cmd.Context(), getEACLPrm)
|
||||
if err == nil {
|
||||
// compare binary values because EACL could have been set already
|
||||
table := res.EACL()
|
||||
got, err := table.Marshal()
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
|
||||
if bytes.Equal(exp, got) {
|
||||
cmd.Println("EACL has been persisted on sidechain")
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
commonCmd.ExitOnErr(cmd, "", errSetEACLTimeout)
|
||||
}
|
||||
},
|
||||
}
|
||||
|
||||
func initContainerSetEACLCmd() {
|
||||
commonflags.Init(setExtendedACLCmd)
|
||||
|
||||
flags := setExtendedACLCmd.Flags()
|
||||
flags.StringVar(&containerID, commonflags.CIDFlag, "", commonflags.CIDFlagUsage)
|
||||
flags.StringVar(&flagVarsSetEACL.srcPath, "table", "", "path to file with JSON or binary encoded EACL table")
|
||||
flags.BoolVar(&containerAwait, "await", false, "block execution until EACL is persisted")
|
||||
flags.BoolVar(&flagVarsSetEACL.noPreCheck, "no-precheck", false, "do not pre-check the extensibility of the container ACL")
|
||||
}
|
|
@ -18,9 +18,8 @@ const (
|
|||
)
|
||||
|
||||
var (
|
||||
errCreateTimeout = errors.New("timeout: container has not been persisted on sidechain")
|
||||
errDeleteTimeout = errors.New("timeout: container has not been removed from sidechain")
|
||||
errSetEACLTimeout = errors.New("timeout: EACL has not been persisted on sidechain")
|
||||
errCreateTimeout = errors.New("timeout: container has not been persisted on sidechain")
|
||||
errDeleteTimeout = errors.New("timeout: container has not been removed from sidechain")
|
||||
)
|
||||
|
||||
func parseContainerID(cmd *cobra.Command) cid.ID {
|
||||
|
|
|
@ -30,7 +30,8 @@ import (
|
|||
)
|
||||
|
||||
const (
|
||||
verifyPresenceAllFlag = "verify-presence-all"
|
||||
verifyPresenceAllFlag = "verify-presence-all"
|
||||
preferInternalAddressesFlag = "prefer-internal-addresses"
|
||||
)
|
||||
|
||||
var (
|
||||
|
@ -97,6 +98,7 @@ func initObjectNodesCmd() {
|
|||
|
||||
flags.Bool(verifyPresenceAllFlag, false, "Verify the actual presence of the object on all netmap nodes.")
|
||||
flags.Bool(commonflags.JSON, false, "Print information about the object placement as json.")
|
||||
flags.Bool(preferInternalAddressesFlag, false, "Use internal addresses first to get object info.")
|
||||
}
|
||||
|
||||
func objectNodes(cmd *cobra.Command, _ []string) {
|
||||
|
@ -449,11 +451,20 @@ func getNodesToCheckObjectExistance(cmd *cobra.Command, netmap *netmapSDK.NetMap
|
|||
func createClient(ctx context.Context, cmd *cobra.Command, candidate netmapSDK.NodeInfo, pk *ecdsa.PrivateKey) (*client.Client, error) {
|
||||
var cli *client.Client
|
||||
var addresses []string
|
||||
candidate.IterateNetworkEndpoints(func(s string) bool {
|
||||
addresses = append(addresses, s)
|
||||
return false
|
||||
})
|
||||
addresses = append(addresses, candidate.ExternalAddresses()...)
|
||||
if preferInternal, _ := cmd.Flags().GetBool(preferInternalAddressesFlag); preferInternal {
|
||||
candidate.IterateNetworkEndpoints(func(s string) bool {
|
||||
addresses = append(addresses, s)
|
||||
return false
|
||||
})
|
||||
addresses = append(addresses, candidate.ExternalAddresses()...)
|
||||
} else {
|
||||
addresses = append(addresses, candidate.ExternalAddresses()...)
|
||||
candidate.IterateNetworkEndpoints(func(s string) bool {
|
||||
addresses = append(addresses, s)
|
||||
return false
|
||||
})
|
||||
}
|
||||
|
||||
var lastErr error
|
||||
for _, address := range addresses {
|
||||
var networkAddr network.Address
|
||||
|
|
10
go.mod
10
go.mod
|
@ -4,11 +4,11 @@ go 1.21
|
|||
|
||||
require (
|
||||
code.gitea.io/sdk/gitea v0.17.1
|
||||
git.frostfs.info/TrueCloudLab/frostfs-api-go/v2 v2.16.1-0.20240530152826-2f6d3209e1d3
|
||||
git.frostfs.info/TrueCloudLab/frostfs-api-go/v2 v2.16.1-0.20240726072425-3dfa2f4fd65e
|
||||
git.frostfs.info/TrueCloudLab/frostfs-contract v0.19.3-0.20240621131249-49e5270f673e
|
||||
git.frostfs.info/TrueCloudLab/frostfs-crypto v0.6.0
|
||||
git.frostfs.info/TrueCloudLab/frostfs-observability v0.0.0-20231101111734-b3ad3335ff65
|
||||
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20240617140730-1a5886e776de
|
||||
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20240726111349-9da46f566fec
|
||||
git.frostfs.info/TrueCloudLab/hrw v1.2.1
|
||||
git.frostfs.info/TrueCloudLab/policy-engine v0.0.0-20240712081403-2628f6184984
|
||||
git.frostfs.info/TrueCloudLab/tzhash v1.8.0
|
||||
|
@ -23,7 +23,7 @@ require (
|
|||
github.com/mitchellh/go-homedir v1.1.0
|
||||
github.com/mr-tron/base58 v1.2.0
|
||||
github.com/multiformats/go-multiaddr v0.12.1
|
||||
github.com/nspcc-dev/neo-go v0.106.0
|
||||
github.com/nspcc-dev/neo-go v0.106.2
|
||||
github.com/olekukonko/tablewriter v0.0.5
|
||||
github.com/panjf2000/ants/v2 v2.9.0
|
||||
github.com/paulmach/orb v0.11.0
|
||||
|
@ -38,7 +38,7 @@ require (
|
|||
go.opentelemetry.io/otel v1.22.0
|
||||
go.opentelemetry.io/otel/trace v1.22.0
|
||||
go.uber.org/zap v1.27.0
|
||||
golang.org/x/exp v0.0.0-20240119083558-1b970713d09a
|
||||
golang.org/x/exp v0.0.0-20240222234643-814bf88cf225
|
||||
golang.org/x/sync v0.6.0
|
||||
golang.org/x/sys v0.18.0
|
||||
golang.org/x/term v0.18.0
|
||||
|
@ -128,4 +128,4 @@ require (
|
|||
rsc.io/tmplfunc v0.0.3 // indirect
|
||||
)
|
||||
|
||||
replace github.com/nspcc-dev/neo-go => git.frostfs.info/TrueCloudLab/neoneo-go v0.106.1-0.20240611123832-594f716b3d18
|
||||
replace github.com/nspcc-dev/neo-go => git.frostfs.info/TrueCloudLab/neoneo-go v0.106.1-0.20240726093631-5481339d6928
|
||||
|
|
BIN
go.sum
BIN
go.sum
Binary file not shown.
|
@ -11,7 +11,6 @@ import (
|
|||
// Client is an interface of FrostFS storage
|
||||
// node's client.
|
||||
type Client interface {
|
||||
ContainerAnnounceUsedSpace(context.Context, client.PrmAnnounceSpace) (*client.ResAnnounceSpace, error)
|
||||
ObjectPutInit(context.Context, client.PrmObjectPutInit) (client.ObjectWriter, error)
|
||||
ObjectPutSingle(context.Context, client.PrmObjectPutSingle) (*client.ResObjectPutSingle, error)
|
||||
ObjectDelete(context.Context, client.PrmObjectDelete) (*client.ResObjectDelete, error)
|
||||
|
|
|
@ -3,7 +3,6 @@ package container
|
|||
import (
|
||||
"crypto/ecdsa"
|
||||
"encoding/hex"
|
||||
"fmt"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
|
@ -238,5 +237,5 @@ func (c *testMorphClient) NotarySignAndInvokeTX(mainTx *transaction.Transaction)
|
|||
type testFrostFSIDClient struct{}
|
||||
|
||||
func (c *testFrostFSIDClient) GetSubject(addr util.Uint160) (*frostfsidclient.Subject, error) {
|
||||
return nil, fmt.Errorf("subject not found")
|
||||
return &frostfsidclient.Subject{}, nil
|
||||
}
|
||||
|
|
|
@ -180,11 +180,6 @@ func (cp *Processor) checkNNS(ctx *putContainerContext, cnr containerSDK.Contain
|
|||
}
|
||||
}
|
||||
|
||||
namespace, hasNamespace := strings.CutSuffix(ctx.d.Zone(), ".ns")
|
||||
if !hasNamespace {
|
||||
return nil
|
||||
}
|
||||
|
||||
addr, err := util.Uint160DecodeBytesBE(cnr.Owner().WalletBytes()[1 : 1+util.Uint160Size])
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not get container owner address: %w", err)
|
||||
|
@ -195,6 +190,11 @@ func (cp *Processor) checkNNS(ctx *putContainerContext, cnr containerSDK.Contain
|
|||
return fmt.Errorf("could not get subject from FrostfsID contract: %w", err)
|
||||
}
|
||||
|
||||
namespace, hasNamespace := strings.CutSuffix(ctx.d.Zone(), ".ns")
|
||||
if !hasNamespace {
|
||||
return nil
|
||||
}
|
||||
|
||||
if subject.Namespace != namespace {
|
||||
return errContainerAndOwnerNamespaceDontMatch
|
||||
}
|
||||
|
|
|
@ -100,7 +100,7 @@ func (e *StorageEngine) delete(ctx context.Context, prm DeletePrm) (DeleteRes, e
|
|||
return false
|
||||
} else {
|
||||
if !client.IsErrObjectNotFound(err) {
|
||||
e.reportShardError(sh, "could not check object existence", err)
|
||||
e.reportShardError(sh, "could not check object existence", err, zap.Stringer("address", prm.addr))
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
@ -116,7 +116,7 @@ func (e *StorageEngine) delete(ctx context.Context, prm DeletePrm) (DeleteRes, e
|
|||
|
||||
_, err = sh.Inhume(ctx, shPrm)
|
||||
if err != nil {
|
||||
e.reportShardError(sh, "could not inhume object in shard", err)
|
||||
e.reportShardError(sh, "could not inhume object in shard", err, zap.Stringer("address", prm.addr))
|
||||
|
||||
var target *apistatus.ObjectLocked
|
||||
locked.is = errors.As(err, &target)
|
||||
|
@ -191,7 +191,7 @@ func (e *StorageEngine) deleteChunks(
|
|||
var objID oid.ID
|
||||
err := objID.ReadFromV2(chunk.ID)
|
||||
if err != nil {
|
||||
e.reportShardError(sh, "could not delete EC chunk", err)
|
||||
e.reportShardError(sh, "could not delete EC chunk", err, zap.Stringer("address", prm.addr))
|
||||
}
|
||||
addr.SetObject(objID)
|
||||
inhumePrm.MarkAsGarbage(addr)
|
||||
|
|
|
@ -8,6 +8,7 @@ import (
|
|||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
|
||||
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
// exists return in the first value true if object exists.
|
||||
|
@ -36,7 +37,7 @@ func (e *StorageEngine) exists(ctx context.Context, shPrm shard.ExistsPrm) (bool
|
|||
}
|
||||
|
||||
if !client.IsErrObjectNotFound(err) {
|
||||
e.reportShardError(sh, "could not check existence of object in shard", err)
|
||||
e.reportShardError(sh, "could not check existence of object in shard", err, zap.Stringer("address", shPrm.Address))
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
|
|
@ -186,7 +186,7 @@ func (i *getShardIterator) tryGetWithMeta(ctx context.Context) {
|
|||
i.ObjectExpired = true
|
||||
return true
|
||||
default:
|
||||
i.Engine.reportShardError(sh, "could not get object from shard", err)
|
||||
i.Engine.reportShardError(sh, "could not get object from shard", err, zap.Stringer("address", i.Address))
|
||||
return false
|
||||
}
|
||||
})
|
||||
|
|
|
@ -12,6 +12,7 @@ import (
|
|||
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
// HeadPrm groups the parameters of Head operation.
|
||||
|
@ -118,7 +119,7 @@ func (e *StorageEngine) head(ctx context.Context, prm HeadPrm) (HeadRes, error)
|
|||
outError = new(apistatus.ObjectNotFound)
|
||||
return true
|
||||
default:
|
||||
e.reportShardError(sh, "could not head object from shard", err)
|
||||
e.reportShardError(sh, "could not head object from shard", err, zap.Stringer("address", prm.addr))
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
@ -126,17 +127,16 @@ func (e *StorageEngine) head(ctx context.Context, prm HeadPrm) (HeadRes, error)
|
|||
return true
|
||||
})
|
||||
|
||||
if head != nil {
|
||||
return HeadRes{head: head}, nil
|
||||
}
|
||||
if outSI != nil {
|
||||
return HeadRes{}, logicerr.Wrap(objectSDK.NewSplitInfoError(outSI))
|
||||
} else if outEI != nil {
|
||||
return HeadRes{}, logicerr.Wrap(objectSDK.NewECInfoError(outEI))
|
||||
} else if head == nil {
|
||||
return HeadRes{}, outError
|
||||
}
|
||||
|
||||
return HeadRes{
|
||||
head: head,
|
||||
}, nil
|
||||
if outEI != nil {
|
||||
return HeadRes{}, logicerr.Wrap(objectSDK.NewECInfoError(outEI))
|
||||
}
|
||||
return HeadRes{}, outError
|
||||
}
|
||||
|
||||
// Head reads object header from local storage by provided address.
|
||||
|
|
|
@ -154,7 +154,7 @@ func (e *StorageEngine) inhumeAddr(ctx context.Context, addr oid.Address, prm sh
|
|||
var siErr *objectSDK.SplitInfoError
|
||||
var ecErr *objectSDK.ECInfoError
|
||||
if !(errors.As(err, &siErr) || errors.As(err, &ecErr)) {
|
||||
e.reportShardError(sh, "could not check for presents in shard", err)
|
||||
e.reportShardError(sh, "could not check for presents in shard", err, zap.Stringer("address", addr))
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -179,7 +179,7 @@ func (e *StorageEngine) inhumeAddr(ctx context.Context, addr oid.Address, prm sh
|
|||
return true
|
||||
}
|
||||
|
||||
e.reportShardError(sh, "could not inhume object in shard", err)
|
||||
e.reportShardError(sh, "could not inhume object in shard", err, zap.Stringer("address", addr))
|
||||
return false
|
||||
}
|
||||
|
||||
|
@ -205,7 +205,7 @@ func (e *StorageEngine) IsLocked(ctx context.Context, addr oid.Address) (bool, e
|
|||
e.iterateOverUnsortedShards(func(h hashedShard) (stop bool) {
|
||||
locked, err = h.Shard.IsLocked(ctx, addr)
|
||||
if err != nil {
|
||||
e.reportShardError(h, "can't check object's lockers", err, zap.Stringer("addr", addr),
|
||||
e.reportShardError(h, "can't check object's lockers", err, zap.Stringer("address", addr),
|
||||
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
|
||||
outErr = err
|
||||
return false
|
||||
|
@ -235,7 +235,7 @@ func (e *StorageEngine) GetLocked(ctx context.Context, addr oid.Address) ([]oid.
|
|||
e.iterateOverUnsortedShards(func(h hashedShard) (stop bool) {
|
||||
ld, err := h.Shard.GetLocked(ctx, addr)
|
||||
if err != nil {
|
||||
e.reportShardError(h, logs.EngineInterruptGettingLockers, err, zap.Stringer("addr", addr),
|
||||
e.reportShardError(h, logs.EngineInterruptGettingLockers, err, zap.Stringer("address", addr),
|
||||
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
|
||||
outErr = err
|
||||
}
|
||||
|
|
|
@ -13,6 +13,7 @@ import (
|
|||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
var errLockFailed = errors.New("lock operation failed")
|
||||
|
@ -62,11 +63,9 @@ func (e *StorageEngine) lock(ctx context.Context, idCnr cid.ID, locker oid.ID, l
|
|||
func (e *StorageEngine) lockSingle(ctx context.Context, idCnr cid.ID, locker, locked oid.ID, checkExists bool) (status uint8) {
|
||||
// code is pretty similar to inhumeAddr, maybe unify?
|
||||
root := false
|
||||
|
||||
var addrLocked oid.Address
|
||||
addrLocked.SetContainer(idCnr)
|
||||
addrLocked.SetObject(locked)
|
||||
|
||||
e.iterateOverSortedShards(addrLocked, func(_ int, sh hashedShard) (stop bool) {
|
||||
defer func() {
|
||||
// if object is root we continue since information about it
|
||||
|
@ -79,7 +78,6 @@ func (e *StorageEngine) lockSingle(ctx context.Context, idCnr cid.ID, locker, lo
|
|||
if checkExists {
|
||||
var existsPrm shard.ExistsPrm
|
||||
existsPrm.Address = addrLocked
|
||||
|
||||
exRes, err := sh.Exists(ctx, existsPrm)
|
||||
if err != nil {
|
||||
var siErr *objectSDK.SplitInfoError
|
||||
|
@ -90,14 +88,16 @@ func (e *StorageEngine) lockSingle(ctx context.Context, idCnr cid.ID, locker, lo
|
|||
var objID oid.ID
|
||||
err = objID.ReadFromV2(chunk.ID)
|
||||
if err != nil {
|
||||
e.reportShardError(sh, "could not lock object in shard", err)
|
||||
e.reportShardError(sh, "could not lock object in shard", err, zap.Stringer("container_id", idCnr),
|
||||
zap.Stringer("locker_id", locker), zap.Stringer("locked_id", locked))
|
||||
return false
|
||||
}
|
||||
eclocked = append(eclocked, objID)
|
||||
}
|
||||
err = sh.Lock(ctx, idCnr, locker, eclocked)
|
||||
if err != nil {
|
||||
e.reportShardError(sh, "could not lock object in shard", err)
|
||||
e.reportShardError(sh, "could not lock object in shard", err, zap.Stringer("container_id", idCnr),
|
||||
zap.Stringer("locker_id", locker), zap.Stringer("locked_id", locked))
|
||||
return false
|
||||
}
|
||||
root = true
|
||||
|
@ -108,8 +108,8 @@ func (e *StorageEngine) lockSingle(ctx context.Context, idCnr cid.ID, locker, lo
|
|||
// do not lock it
|
||||
return true
|
||||
}
|
||||
|
||||
e.reportShardError(sh, "could not check locked object for presence in shard", err)
|
||||
e.reportShardError(sh, "could not check locked object for presence in shard", err, zap.Stringer("container_id", idCnr),
|
||||
zap.Stringer("locker_id", locker), zap.Stringer("locked_id", locked))
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -121,21 +121,18 @@ func (e *StorageEngine) lockSingle(ctx context.Context, idCnr cid.ID, locker, lo
|
|||
|
||||
err := sh.Lock(ctx, idCnr, locker, []oid.ID{locked})
|
||||
if err != nil {
|
||||
e.reportShardError(sh, "could not lock object in shard", err)
|
||||
e.reportShardError(sh, "could not lock object in shard", err, zap.Stringer("container_id", idCnr),
|
||||
zap.Stringer("locker_id", locker), zap.Stringer("locked_id", locked))
|
||||
|
||||
var errIrregular *apistatus.LockNonRegularObject
|
||||
if errors.As(err, &errIrregular) {
|
||||
status = 1
|
||||
return true
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
|
||||
status = 2
|
||||
|
||||
return true
|
||||
})
|
||||
|
||||
return
|
||||
}
|
||||
|
|
|
@ -187,7 +187,7 @@ func (e *StorageEngine) putToShard(ctx context.Context, sh hashedShard, pool uti
|
|||
return
|
||||
}
|
||||
|
||||
e.reportShardError(sh, "could not put object to shard", err)
|
||||
e.reportShardError(sh, "could not put object to shard", err, zap.Stringer("address", addr))
|
||||
return
|
||||
}
|
||||
|
||||
|
|
|
@ -208,7 +208,7 @@ func (i *getRangeShardIterator) tryGetWithMeta(ctx context.Context) {
|
|||
|
||||
return true // stop, return it back
|
||||
default:
|
||||
i.Engine.reportShardError(sh, "could not get object from shard", err)
|
||||
i.Engine.reportShardError(sh, "could not get object from shard", err, zap.Stringer("address", i.Address))
|
||||
return false
|
||||
}
|
||||
})
|
||||
|
|
|
@ -160,11 +160,23 @@ func getVirtualObject(tx *bbolt.Tx, cnr cid.ID, key []byte, raw bool) (*objectSD
|
|||
return nil, logicerr.Wrap(new(apistatus.ObjectNotFound))
|
||||
}
|
||||
|
||||
// pick last item, for now there is not difference which address to pick
|
||||
// but later list might be sorted so first or last value can be more
|
||||
// prioritized to choose
|
||||
virtualOID := relativeLst[len(relativeLst)-1]
|
||||
data := getFromBucket(tx, primaryBucketName(cnr, bucketName), virtualOID)
|
||||
var data []byte
|
||||
for i := 0; i < len(relativeLst) && len(data) == 0; i++ {
|
||||
virtualOID := relativeLst[len(relativeLst)-i-1]
|
||||
data = getFromBucket(tx, primaryBucketName(cnr, bucketName), virtualOID)
|
||||
}
|
||||
|
||||
if len(data) == 0 {
|
||||
// check if any of the relatives is an EC object
|
||||
for _, relative := range relativeLst {
|
||||
data = getFromBucket(tx, ecInfoBucketName(cnr, bucketName), relative)
|
||||
if len(data) > 0 {
|
||||
// we can't return object headers, but can return error,
|
||||
// so assembler can try to assemble complex object
|
||||
return nil, getSplitInfoError(tx, cnr, key)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
child := objectSDK.New()
|
||||
|
||||
|
|
|
@ -148,6 +148,10 @@ func New(ctx context.Context, key *keys.PrivateKey, opts ...Option) (*Client, er
|
|||
} else {
|
||||
cli.logger.Info(logs.FrostFSIRCreatedRPCClientForEndpoint,
|
||||
zap.String("endpoint", endpoint.Address))
|
||||
if cli.endpoints.curr > 0 && cli.cfg.switchInterval != 0 {
|
||||
cli.switchIsActive.Store(true)
|
||||
go cli.switchToMostPrioritized(ctx)
|
||||
}
|
||||
break
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,53 +1,13 @@
|
|||
package container
|
||||
|
||||
import (
|
||||
"crypto/sha256"
|
||||
"fmt"
|
||||
|
||||
v2refs "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/refs"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
|
||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||
)
|
||||
|
||||
// AnnounceLoadPrm groups parameters of AnnounceLoad operation.
|
||||
type AnnounceLoadPrm struct {
|
||||
a container.SizeEstimation
|
||||
key []byte
|
||||
|
||||
client.InvokePrmOptional
|
||||
}
|
||||
|
||||
// SetAnnouncement sets announcement.
|
||||
func (a2 *AnnounceLoadPrm) SetAnnouncement(a container.SizeEstimation) {
|
||||
a2.a = a
|
||||
}
|
||||
|
||||
// SetReporter sets public key of the reporter.
|
||||
func (a2 *AnnounceLoadPrm) SetReporter(key []byte) {
|
||||
a2.key = key
|
||||
}
|
||||
|
||||
// AnnounceLoad saves container size estimation calculated by storage node
|
||||
// with key in FrostFS system through Container contract call.
|
||||
//
|
||||
// Returns any error encountered that caused the saving to interrupt.
|
||||
func (c *Client) AnnounceLoad(p AnnounceLoadPrm) error {
|
||||
binCnr := make([]byte, sha256.Size)
|
||||
p.a.Container().Encode(binCnr)
|
||||
|
||||
prm := client.InvokePrm{}
|
||||
prm.SetMethod(putSizeMethod)
|
||||
prm.SetArgs(p.a.Epoch(), binCnr, p.a.Value(), p.key)
|
||||
prm.InvokePrmOptional = p.InvokePrmOptional
|
||||
|
||||
_, err := c.client.Invoke(prm)
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not invoke method (%s): %w", putSizeMethod, err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// EstimationID is an identity of container load estimation inside Container contract.
|
||||
type EstimationID []byte
|
||||
|
||||
|
|
9
pkg/network/cache/multi.go
vendored
9
pkg/network/cache/multi.go
vendored
|
@ -239,15 +239,6 @@ func (x *multiClient) ObjectPutSingle(ctx context.Context, p client.PrmObjectPut
|
|||
return
|
||||
}
|
||||
|
||||
func (x *multiClient) ContainerAnnounceUsedSpace(ctx context.Context, prm client.PrmAnnounceSpace) (res *client.ResAnnounceSpace, err error) {
|
||||
err = x.iterateClients(ctx, func(c clientcore.Client) error {
|
||||
res, err = c.ContainerAnnounceUsedSpace(ctx, prm)
|
||||
return err
|
||||
})
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func (x *multiClient) ObjectDelete(ctx context.Context, p client.PrmObjectDelete) (res *client.ResObjectDelete, err error) {
|
||||
err = x.iterateClients(ctx, func(c clientcore.Client) error {
|
||||
res, err = c.ObjectDelete(ctx, p)
|
||||
|
|
|
@ -81,21 +81,6 @@ func (s *Server) List(ctx context.Context, req *containerGRPC.ListRequest) (*con
|
|||
return resp.ToGRPCMessage().(*containerGRPC.ListResponse), nil
|
||||
}
|
||||
|
||||
// SetExtendedACL converts gRPC SetExtendedACLRequest message and passes it to internal Container service.
|
||||
func (s *Server) SetExtendedACL(ctx context.Context, req *containerGRPC.SetExtendedACLRequest) (*containerGRPC.SetExtendedACLResponse, error) {
|
||||
setEACLReq := new(container.SetExtendedACLRequest)
|
||||
if err := setEACLReq.FromGRPCMessage(req); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
resp, err := s.srv.SetExtendedACL(ctx, setEACLReq)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return resp.ToGRPCMessage().(*containerGRPC.SetExtendedACLResponse), nil
|
||||
}
|
||||
|
||||
// GetExtendedACL converts gRPC GetExtendedACLRequest message and passes it to internal Container service.
|
||||
func (s *Server) GetExtendedACL(ctx context.Context, req *containerGRPC.GetExtendedACLRequest) (*containerGRPC.GetExtendedACLResponse, error) {
|
||||
getEACLReq := new(container.GetExtendedACLRequest)
|
||||
|
@ -110,18 +95,3 @@ func (s *Server) GetExtendedACL(ctx context.Context, req *containerGRPC.GetExten
|
|||
|
||||
return resp.ToGRPCMessage().(*containerGRPC.GetExtendedACLResponse), nil
|
||||
}
|
||||
|
||||
// AnnounceUsedSpace converts gRPC AnnounceUsedSpaceRequest message and passes it to internal Container service.
|
||||
func (s *Server) AnnounceUsedSpace(ctx context.Context, req *containerGRPC.AnnounceUsedSpaceRequest) (*containerGRPC.AnnounceUsedSpaceResponse, error) {
|
||||
announceReq := new(container.AnnounceUsedSpaceRequest)
|
||||
if err := announceReq.FromGRPCMessage(req); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
resp, err := s.srv.AnnounceUsedSpace(ctx, announceReq)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return resp.ToGRPCMessage().(*containerGRPC.AnnounceUsedSpaceResponse), nil
|
||||
}
|
||||
|
|
|
@ -78,15 +78,6 @@ func NewAPEServer(router policyengine.ChainRouter, reader containers, ir ir, nm
|
|||
}
|
||||
}
|
||||
|
||||
func (ac *apeChecker) AnnounceUsedSpace(ctx context.Context, req *container.AnnounceUsedSpaceRequest) (*container.AnnounceUsedSpaceResponse, error) {
|
||||
ctx, span := tracing.StartSpanFromContext(ctx, "apeChecker.AnnounceUsedSpace")
|
||||
defer span.End()
|
||||
|
||||
// this method is not used, so not checked
|
||||
|
||||
return ac.next.AnnounceUsedSpace(ctx, req)
|
||||
}
|
||||
|
||||
func (ac *apeChecker) Delete(ctx context.Context, req *container.DeleteRequest) (*container.DeleteResponse, error) {
|
||||
ctx, span := tracing.StartSpanFromContext(ctx, "apeChecker.Delete")
|
||||
defer span.End()
|
||||
|
@ -220,7 +211,7 @@ func (ac *apeChecker) Put(ctx context.Context, req *container.PutRequest) (*cont
|
|||
}
|
||||
}
|
||||
|
||||
namespace, err := ac.namespaceByOwner(req.GetBody().GetContainer().GetOwnerID())
|
||||
namespace, err := ac.namespaceByKnownOwner(req.GetBody().GetContainer().GetOwnerID())
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("get namespace error: %w", err)
|
||||
}
|
||||
|
@ -303,18 +294,6 @@ func (ac *apeChecker) getRoleWithoutContainerID(oID *refs.OwnerID, mh *session.R
|
|||
return nativeschema.PropertyValueContainerRoleOthers, pk, nil
|
||||
}
|
||||
|
||||
func (ac *apeChecker) SetExtendedACL(ctx context.Context, req *container.SetExtendedACLRequest) (*container.SetExtendedACLResponse, error) {
|
||||
ctx, span := tracing.StartSpanFromContext(ctx, "apeChecker.SetExtendedACL")
|
||||
defer span.End()
|
||||
|
||||
if err := ac.validateContainerBoundedOperation(ctx, req.GetBody().GetEACL().GetContainerID(), req.GetMetaHeader(), req.GetVerificationHeader(),
|
||||
nativeschema.MethodSetContainerEACL); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return ac.next.SetExtendedACL(ctx, req)
|
||||
}
|
||||
|
||||
func (ac *apeChecker) validateContainerBoundedOperation(ctx context.Context, containerID *refs.ContainerID, mh *session.RequestMetaHeader, vh *session.RequestVerificationHeader, op string) error {
|
||||
if vh == nil {
|
||||
return errMissingVerificationHeader
|
||||
|
@ -629,6 +608,25 @@ func (ac *apeChecker) namespaceByOwner(owner *refs.OwnerID) (string, error) {
|
|||
return namespace, nil
|
||||
}
|
||||
|
||||
func (ac *apeChecker) namespaceByKnownOwner(owner *refs.OwnerID) (string, error) {
|
||||
var ownerSDK user.ID
|
||||
if owner == nil {
|
||||
return "", errOwnerIDIsNotSet
|
||||
}
|
||||
if err := ownerSDK.ReadFromV2(*owner); err != nil {
|
||||
return "", err
|
||||
}
|
||||
addr, err := ownerSDK.ScriptHash()
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
subject, err := ac.frostFSIDClient.GetSubject(addr)
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("get subject error: %w", err)
|
||||
}
|
||||
return subject.Namespace, nil
|
||||
}
|
||||
|
||||
// validateNamespace validates a namespace set in a container.
|
||||
// If frostfs-id contract stores a namespace N1 for an owner ID and a container within a request
|
||||
// is set with namespace N2 (via Zone() property), then N2 is invalid and the request is denied.
|
||||
|
|
|
@ -9,7 +9,6 @@ import (
|
|||
"net"
|
||||
"testing"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/acl"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/container"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/refs"
|
||||
session "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/session"
|
||||
|
@ -50,7 +49,6 @@ func TestAPE(t *testing.T) {
|
|||
t.Run("deny get container by user claim tag", testDenyGetContainerByUserClaimTag)
|
||||
t.Run("deny get container by IP", testDenyGetContainerByIP)
|
||||
t.Run("deny get container by group id", testDenyGetContainerByGroupID)
|
||||
t.Run("deny set container eACL for IR", testDenySetContainerEACLForIR)
|
||||
t.Run("deny get container eACL for IR with session token", testDenyGetContainerEACLForIRSessionToken)
|
||||
t.Run("deny put container for others with session token", testDenyPutContainerForOthersSessionToken)
|
||||
t.Run("deny put container, read namespace from frostfsID", testDenyPutContainerReadNamespaceFromFrostfsID)
|
||||
|
@ -665,84 +663,6 @@ func testDenyGetContainerByGroupID(t *testing.T) {
|
|||
require.ErrorAs(t, err, &errAccessDenied)
|
||||
}
|
||||
|
||||
func testDenySetContainerEACLForIR(t *testing.T) {
|
||||
t.Parallel()
|
||||
srv := &srvStub{
|
||||
calls: map[string]int{},
|
||||
}
|
||||
router := inmemory.NewInMemory()
|
||||
contRdr := &containerStub{
|
||||
c: map[cid.ID]*containercore.Container{},
|
||||
}
|
||||
ir := &irStub{
|
||||
keys: [][]byte{},
|
||||
}
|
||||
nm := &netmapStub{}
|
||||
frostfsIDSubjectReader := &frostfsidStub{
|
||||
subjects: map[util.Uint160]*client.Subject{},
|
||||
}
|
||||
apeSrv := NewAPEServer(router, contRdr, ir, nm, frostfsIDSubjectReader, srv)
|
||||
|
||||
contID := cidtest.ID()
|
||||
testContainer := containertest.Container()
|
||||
pp := netmap.PlacementPolicy{}
|
||||
require.NoError(t, pp.DecodeString("REP 1"))
|
||||
testContainer.SetPlacementPolicy(pp)
|
||||
contRdr.c[contID] = &containercore.Container{Value: testContainer}
|
||||
|
||||
nm.currentEpoch = 100
|
||||
nm.netmaps = map[uint64]*netmap.NetMap{}
|
||||
var testNetmap netmap.NetMap
|
||||
testNetmap.SetEpoch(nm.currentEpoch)
|
||||
testNetmap.SetNodes([]netmap.NodeInfo{{}})
|
||||
nm.netmaps[nm.currentEpoch] = &testNetmap
|
||||
nm.netmaps[nm.currentEpoch-1] = &testNetmap
|
||||
|
||||
_, _, err := router.MorphRuleChainStorage().AddMorphRuleChain(chain.Ingress, engine.ContainerTarget(contID.EncodeToString()), &chain.Chain{
|
||||
Rules: []chain.Rule{
|
||||
{
|
||||
Status: chain.AccessDenied,
|
||||
Actions: chain.Actions{
|
||||
Names: []string{
|
||||
nativeschema.MethodSetContainerEACL,
|
||||
},
|
||||
},
|
||||
Resources: chain.Resources{
|
||||
Names: []string{
|
||||
fmt.Sprintf(nativeschema.ResourceFormatRootContainer, contID.EncodeToString()),
|
||||
},
|
||||
},
|
||||
Condition: []chain.Condition{
|
||||
{
|
||||
Kind: chain.KindRequest,
|
||||
Key: nativeschema.PropertyKeyActorRole,
|
||||
Value: nativeschema.PropertyValueContainerRoleIR,
|
||||
Op: chain.CondStringEquals,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
req := &container.SetExtendedACLRequest{}
|
||||
req.SetBody(&container.SetExtendedACLRequestBody{})
|
||||
var refContID refs.ContainerID
|
||||
contID.WriteToV2(&refContID)
|
||||
req.GetBody().SetEACL(&acl.Table{})
|
||||
req.GetBody().GetEACL().SetContainerID(&refContID)
|
||||
|
||||
pk, err := keys.NewPrivateKey()
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, signature.SignServiceMessage(&pk.PrivateKey, req))
|
||||
ir.keys = append(ir.keys, pk.PublicKey().Bytes())
|
||||
|
||||
resp, err := apeSrv.SetExtendedACL(context.Background(), req)
|
||||
require.Nil(t, resp)
|
||||
var errAccessDenied *apistatus.ObjectAccessDenied
|
||||
require.ErrorAs(t, err, &errAccessDenied)
|
||||
}
|
||||
|
||||
func testDenyGetContainerEACLForIRSessionToken(t *testing.T) {
|
||||
t.Parallel()
|
||||
srv := &srvStub{
|
||||
|
@ -845,17 +765,22 @@ func testDenyPutContainerForOthersSessionToken(t *testing.T) {
|
|||
keys: [][]byte{},
|
||||
}
|
||||
nm := &netmapStub{}
|
||||
frostfsIDSubjectReader := &frostfsidStub{
|
||||
subjects: map[util.Uint160]*client.Subject{},
|
||||
}
|
||||
apeSrv := NewAPEServer(router, contRdr, ir, nm, frostfsIDSubjectReader, srv)
|
||||
|
||||
testContainer := containertest.Container()
|
||||
owner := testContainer.Owner()
|
||||
ownerAddr, err := owner.ScriptHash()
|
||||
require.NoError(t, err)
|
||||
frostfsIDSubjectReader := &frostfsidStub{
|
||||
subjects: map[util.Uint160]*client.Subject{
|
||||
ownerAddr: {},
|
||||
},
|
||||
}
|
||||
apeSrv := NewAPEServer(router, contRdr, ir, nm, frostfsIDSubjectReader, srv)
|
||||
|
||||
nm.currentEpoch = 100
|
||||
nm.netmaps = map[uint64]*netmap.NetMap{}
|
||||
|
||||
_, _, err := router.MorphRuleChainStorage().AddMorphRuleChain(chain.Ingress, engine.NamespaceTarget(""), &chain.Chain{
|
||||
_, _, err = router.MorphRuleChainStorage().AddMorphRuleChain(chain.Ingress, engine.NamespaceTarget(""), &chain.Chain{
|
||||
Rules: []chain.Rule{
|
||||
{
|
||||
Status: chain.AccessDenied,
|
||||
|
@ -1229,11 +1154,6 @@ type srvStub struct {
|
|||
calls map[string]int
|
||||
}
|
||||
|
||||
func (s *srvStub) AnnounceUsedSpace(context.Context, *container.AnnounceUsedSpaceRequest) (*container.AnnounceUsedSpaceResponse, error) {
|
||||
s.calls["AnnounceUsedSpace"]++
|
||||
return &container.AnnounceUsedSpaceResponse{}, nil
|
||||
}
|
||||
|
||||
func (s *srvStub) Delete(context.Context, *container.DeleteRequest) (*container.DeleteResponse, error) {
|
||||
s.calls["Delete"]++
|
||||
return &container.DeleteResponse{}, nil
|
||||
|
@ -1259,11 +1179,6 @@ func (s *srvStub) Put(context.Context, *container.PutRequest) (*container.PutRes
|
|||
return &container.PutResponse{}, nil
|
||||
}
|
||||
|
||||
func (s *srvStub) SetExtendedACL(context.Context, *container.SetExtendedACLRequest) (*container.SetExtendedACLResponse, error) {
|
||||
s.calls["SetExtendedACL"]++
|
||||
return &container.SetExtendedACLResponse{}, nil
|
||||
}
|
||||
|
||||
type irStub struct {
|
||||
keys [][]byte
|
||||
}
|
||||
|
|
|
@ -6,7 +6,6 @@ import (
|
|||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/container"
|
||||
container_grpc "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/container/grpc"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/refs"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/audit"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||
|
@ -29,24 +28,6 @@ func NewAuditService(next Server, log *logger.Logger, enabled *atomic.Bool) Serv
|
|||
}
|
||||
}
|
||||
|
||||
// AnnounceUsedSpace implements Server.
|
||||
func (a *auditService) AnnounceUsedSpace(ctx context.Context, req *container.AnnounceUsedSpaceRequest) (*container.AnnounceUsedSpaceResponse, error) {
|
||||
res, err := a.next.AnnounceUsedSpace(ctx, req)
|
||||
if !a.enabled.Load() {
|
||||
return res, err
|
||||
}
|
||||
|
||||
var ids []*refs.ContainerID
|
||||
for _, v := range req.GetBody().GetAnnouncements() {
|
||||
ids = append(ids, v.GetContainerID())
|
||||
}
|
||||
|
||||
audit.LogRequest(a.log, container_grpc.ContainerService_AnnounceUsedSpace_FullMethodName, req,
|
||||
audit.TargetFromRefs(ids, &cid.ID{}), err == nil)
|
||||
|
||||
return res, err
|
||||
}
|
||||
|
||||
// Delete implements Server.
|
||||
func (a *auditService) Delete(ctx context.Context, req *container.DeleteRequest) (*container.DeleteResponse, error) {
|
||||
res, err := a.next.Delete(ctx, req)
|
||||
|
@ -103,14 +84,3 @@ func (a *auditService) Put(ctx context.Context, req *container.PutRequest) (*con
|
|||
audit.TargetFromRef(res.GetBody().GetContainerID(), &cid.ID{}), err == nil)
|
||||
return res, err
|
||||
}
|
||||
|
||||
// SetExtendedACL implements Server.
|
||||
func (a *auditService) SetExtendedACL(ctx context.Context, req *container.SetExtendedACLRequest) (*container.SetExtendedACLResponse, error) {
|
||||
res, err := a.next.SetExtendedACL(ctx, req)
|
||||
if !a.enabled.Load() {
|
||||
return res, err
|
||||
}
|
||||
audit.LogRequest(a.log, container_grpc.ContainerService_SetExtendedACL_FullMethodName, req,
|
||||
audit.TargetFromRef(req.GetBody().GetEACL().GetContainerID(), &cid.ID{}), err == nil)
|
||||
return res, err
|
||||
}
|
||||
|
|
|
@ -14,7 +14,6 @@ type ServiceExecutor interface {
|
|||
Delete(context.Context, *session.Token, *container.DeleteRequestBody) (*container.DeleteResponseBody, error)
|
||||
Get(context.Context, *container.GetRequestBody) (*container.GetResponseBody, error)
|
||||
List(context.Context, *container.ListRequestBody) (*container.ListResponseBody, error)
|
||||
SetExtendedACL(context.Context, *session.Token, *container.SetExtendedACLRequestBody) (*container.SetExtendedACLResponseBody, error)
|
||||
GetExtendedACL(context.Context, *container.GetExtendedACLRequestBody) (*container.GetExtendedACLResponseBody, error)
|
||||
}
|
||||
|
||||
|
@ -96,24 +95,6 @@ func (s *executorSvc) List(ctx context.Context, req *container.ListRequest) (*co
|
|||
return resp, nil
|
||||
}
|
||||
|
||||
func (s *executorSvc) SetExtendedACL(ctx context.Context, req *container.SetExtendedACLRequest) (*container.SetExtendedACLResponse, error) {
|
||||
meta := req.GetMetaHeader()
|
||||
for origin := meta.GetOrigin(); origin != nil; origin = meta.GetOrigin() {
|
||||
meta = origin
|
||||
}
|
||||
|
||||
respBody, err := s.exec.SetExtendedACL(ctx, meta.GetSessionToken(), req.GetBody())
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("could not execute SetEACL request: %w", err)
|
||||
}
|
||||
|
||||
resp := new(container.SetExtendedACLResponse)
|
||||
resp.SetBody(respBody)
|
||||
|
||||
s.respSvc.SetMeta(resp)
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
func (s *executorSvc) GetExtendedACL(ctx context.Context, req *container.GetExtendedACLRequest) (*container.GetExtendedACLResponse, error) {
|
||||
respBody, err := s.exec.GetExtendedACL(ctx, req.GetBody())
|
||||
if err != nil {
|
||||
|
|
|
@ -13,8 +13,6 @@ import (
|
|||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/session"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/status"
|
||||
)
|
||||
|
||||
var errMissingUserID = errors.New("missing user ID")
|
||||
|
@ -204,10 +202,6 @@ func (s *morphExecutor) List(_ context.Context, body *container.ListRequestBody)
|
|||
return res, nil
|
||||
}
|
||||
|
||||
func (s *morphExecutor) SetExtendedACL(_ context.Context, _ *sessionV2.Token, _ *container.SetExtendedACLRequestBody) (*container.SetExtendedACLResponseBody, error) {
|
||||
return nil, status.Errorf(codes.Unimplemented, "method SetExtendedACL not implemented")
|
||||
}
|
||||
|
||||
func (s *morphExecutor) GetExtendedACL(_ context.Context, body *container.GetExtendedACLRequestBody) (*container.GetExtendedACLResponseBody, error) {
|
||||
idV2 := body.GetContainerID()
|
||||
if idV2 == nil {
|
||||
|
|
|
@ -12,7 +12,5 @@ type Server interface {
|
|||
Get(context.Context, *container.GetRequest) (*container.GetResponse, error)
|
||||
Delete(context.Context, *container.DeleteRequest) (*container.DeleteResponse, error)
|
||||
List(context.Context, *container.ListRequest) (*container.ListResponse, error)
|
||||
SetExtendedACL(context.Context, *container.SetExtendedACLRequest) (*container.SetExtendedACLResponse, error)
|
||||
GetExtendedACL(context.Context, *container.GetExtendedACLRequest) (*container.GetExtendedACLResponse, error)
|
||||
AnnounceUsedSpace(context.Context, *container.AnnounceUsedSpaceRequest) (*container.AnnounceUsedSpaceResponse, error)
|
||||
}
|
||||
|
|
|
@ -57,15 +57,6 @@ func (s *signService) List(ctx context.Context, req *container.ListRequest) (*co
|
|||
return resp, s.sigSvc.SignResponse(resp, err)
|
||||
}
|
||||
|
||||
func (s *signService) SetExtendedACL(ctx context.Context, req *container.SetExtendedACLRequest) (*container.SetExtendedACLResponse, error) {
|
||||
if err := s.sigSvc.VerifyRequest(req); err != nil {
|
||||
resp := new(container.SetExtendedACLResponse)
|
||||
return resp, s.sigSvc.SignResponse(resp, err)
|
||||
}
|
||||
resp, err := util.EnsureNonNilResponse(s.svc.SetExtendedACL(ctx, req))
|
||||
return resp, s.sigSvc.SignResponse(resp, err)
|
||||
}
|
||||
|
||||
func (s *signService) GetExtendedACL(ctx context.Context, req *container.GetExtendedACLRequest) (*container.GetExtendedACLResponse, error) {
|
||||
if err := s.sigSvc.VerifyRequest(req); err != nil {
|
||||
resp := new(container.GetExtendedACLResponse)
|
||||
|
@ -74,12 +65,3 @@ func (s *signService) GetExtendedACL(ctx context.Context, req *container.GetExte
|
|||
resp, err := util.EnsureNonNilResponse(s.svc.GetExtendedACL(ctx, req))
|
||||
return resp, s.sigSvc.SignResponse(resp, err)
|
||||
}
|
||||
|
||||
func (s *signService) AnnounceUsedSpace(ctx context.Context, req *container.AnnounceUsedSpaceRequest) (*container.AnnounceUsedSpaceResponse, error) {
|
||||
if err := s.sigSvc.VerifyRequest(req); err != nil {
|
||||
resp := new(container.AnnounceUsedSpaceResponse)
|
||||
return resp, s.sigSvc.SignResponse(resp, err)
|
||||
}
|
||||
resp, err := util.EnsureNonNilResponse(s.svc.AnnounceUsedSpace(ctx, req))
|
||||
return resp, s.sigSvc.SignResponse(resp, err)
|
||||
}
|
||||
|
|
|
@ -12,7 +12,7 @@ import (
|
|||
)
|
||||
|
||||
func (r *request) assemble(ctx context.Context) {
|
||||
if !r.canAssemble() {
|
||||
if !r.canAssembleComplexObject() {
|
||||
r.log.Debug(logs.GetCanNotAssembleTheObject)
|
||||
return
|
||||
}
|
||||
|
@ -38,7 +38,7 @@ func (r *request) assemble(ctx context.Context) {
|
|||
r.log.Debug(logs.GetTryingToAssembleTheObject)
|
||||
|
||||
r.prm.common = r.prm.common.WithLocalOnly(false)
|
||||
assembler := newAssembler(r.address(), r.splitInfo(), r.ctxRange(), r)
|
||||
assembler := newAssembler(r.address(), r.splitInfo(), r.ctxRange(), r, r.headOnly())
|
||||
|
||||
r.log.Debug(logs.GetAssemblingSplittedObject,
|
||||
zap.Uint64("range_offset", r.ctxRange().GetOffset()),
|
||||
|
|
|
@ -11,7 +11,7 @@ import (
|
|||
)
|
||||
|
||||
func (r *request) assembleEC(ctx context.Context) {
|
||||
if r.isRaw() && r.isLocal() {
|
||||
if r.isRaw() {
|
||||
r.log.Debug(logs.GetCanNotAssembleTheObject)
|
||||
return
|
||||
}
|
||||
|
@ -43,7 +43,7 @@ func (r *request) assembleEC(ctx context.Context) {
|
|||
}
|
||||
|
||||
r.prm.common = r.prm.common.WithLocalOnly(false)
|
||||
assembler := newAssemblerEC(r.address(), r.infoEC, r.ctxRange(), r, r.localStorage, r.containerSource, r.log, r.headOnly(), r.isRaw(), r.traverserGenerator, r.curProcEpoch)
|
||||
assembler := newAssemblerEC(r.address(), r.infoEC, r.ctxRange(), r, r.localStorage, r.containerSource, r.log, r.headOnly(), r.traverserGenerator, r.curProcEpoch)
|
||||
|
||||
r.log.Debug(logs.GetAssemblingECObject,
|
||||
zap.Uint64("range_offset", r.ctxRange().GetOffset()),
|
||||
|
|
|
@ -19,6 +19,7 @@ type assembler struct {
|
|||
splitInfo *objectSDK.SplitInfo
|
||||
rng *objectSDK.Range
|
||||
objGetter objectGetter
|
||||
head bool
|
||||
|
||||
currentOffset uint64
|
||||
|
||||
|
@ -30,18 +31,23 @@ func newAssembler(
|
|||
splitInfo *objectSDK.SplitInfo,
|
||||
rng *objectSDK.Range,
|
||||
objGetter objectGetter,
|
||||
head bool,
|
||||
) *assembler {
|
||||
return &assembler{
|
||||
addr: addr,
|
||||
rng: rng,
|
||||
splitInfo: splitInfo,
|
||||
objGetter: objGetter,
|
||||
head: head,
|
||||
}
|
||||
}
|
||||
|
||||
// Assemble assembles splitted large object and writes it's content to ObjectWriter.
|
||||
// It returns parent object.
|
||||
func (a *assembler) Assemble(ctx context.Context, writer ObjectWriter) (*objectSDK.Object, error) {
|
||||
if a.head {
|
||||
return a.assembleHeader(ctx, writer)
|
||||
}
|
||||
sourceObjectID, ok := a.getLastPartOrLinkObjectID()
|
||||
if !ok {
|
||||
return nil, objectSDK.NewSplitInfoError(a.splitInfo)
|
||||
|
@ -65,6 +71,43 @@ func (a *assembler) Assemble(ctx context.Context, writer ObjectWriter) (*objectS
|
|||
return a.parentObject, nil
|
||||
}
|
||||
|
||||
func (a *assembler) assembleHeader(ctx context.Context, writer ObjectWriter) (*objectSDK.Object, error) {
|
||||
var sourceObjectIDs []oid.ID
|
||||
sourceObjectID, ok := a.splitInfo.Link()
|
||||
if ok {
|
||||
sourceObjectIDs = append(sourceObjectIDs, sourceObjectID)
|
||||
}
|
||||
sourceObjectID, ok = a.splitInfo.LastPart()
|
||||
if ok {
|
||||
sourceObjectIDs = append(sourceObjectIDs, sourceObjectID)
|
||||
}
|
||||
if len(sourceObjectIDs) == 0 {
|
||||
return nil, objectSDK.NewSplitInfoError(a.splitInfo)
|
||||
}
|
||||
for _, sourceObjectID = range sourceObjectIDs {
|
||||
obj, err := a.getParent(ctx, sourceObjectID, writer)
|
||||
if err == nil {
|
||||
return obj, nil
|
||||
}
|
||||
}
|
||||
return nil, objectSDK.NewSplitInfoError(a.splitInfo)
|
||||
}
|
||||
|
||||
func (a *assembler) getParent(ctx context.Context, sourceObjectID oid.ID, writer ObjectWriter) (*objectSDK.Object, error) {
|
||||
obj, err := a.objGetter.HeadObject(ctx, sourceObjectID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
parent := obj.Parent()
|
||||
if parent == nil {
|
||||
return nil, objectSDK.NewSplitInfoError(a.splitInfo)
|
||||
}
|
||||
if err := writer.WriteHeader(ctx, parent); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return obj, nil
|
||||
}
|
||||
|
||||
func (a *assembler) getLastPartOrLinkObjectID() (oid.ID, bool) {
|
||||
sourceObjectID, ok := a.splitInfo.Link()
|
||||
if ok {
|
||||
|
|
|
@ -37,7 +37,6 @@ type assemblerec struct {
|
|||
cs container.Source
|
||||
log *logger.Logger
|
||||
head bool
|
||||
raw bool
|
||||
traverserGenerator traverserGenerator
|
||||
epoch uint64
|
||||
}
|
||||
|
@ -51,7 +50,6 @@ func newAssemblerEC(
|
|||
cs container.Source,
|
||||
log *logger.Logger,
|
||||
head bool,
|
||||
raw bool,
|
||||
tg traverserGenerator,
|
||||
epoch uint64,
|
||||
) *assemblerec {
|
||||
|
@ -64,7 +62,6 @@ func newAssemblerEC(
|
|||
cs: cs,
|
||||
log: log,
|
||||
head: head,
|
||||
raw: raw,
|
||||
traverserGenerator: tg,
|
||||
epoch: epoch,
|
||||
}
|
||||
|
@ -74,9 +71,6 @@ func newAssemblerEC(
|
|||
// It returns parent object.
|
||||
func (a *assemblerec) Assemble(ctx context.Context, writer ObjectWriter) (*objectSDK.Object, error) {
|
||||
switch {
|
||||
case a.raw:
|
||||
err := a.reconstructRawError(ctx)
|
||||
return nil, err
|
||||
case a.head:
|
||||
return a.reconstructHeader(ctx, writer)
|
||||
case a.rng != nil:
|
||||
|
@ -149,56 +143,6 @@ func (a *assemblerec) reconstructObjectFromParts(ctx context.Context, headers bo
|
|||
return c.Reconstruct(parts)
|
||||
}
|
||||
|
||||
func (a *assemblerec) reconstructRawError(ctx context.Context) error {
|
||||
chunks := make(map[string]objectSDK.ECChunk)
|
||||
var chunksGuard sync.Mutex
|
||||
for _, ch := range a.ecInfo.localChunks {
|
||||
chunks[string(ch.ID.GetValue())] = ch
|
||||
}
|
||||
|
||||
objID := a.addr.Object()
|
||||
trav, _, err := a.traverserGenerator.GenerateTraverser(a.addr.Container(), &objID, a.epoch)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
eg, ctx := errgroup.WithContext(ctx)
|
||||
for {
|
||||
batch := trav.Next()
|
||||
if len(batch) == 0 {
|
||||
break
|
||||
}
|
||||
for _, node := range batch {
|
||||
var info client.NodeInfo
|
||||
client.NodeInfoFromNetmapElement(&info, node)
|
||||
eg.Go(func() error {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
default:
|
||||
}
|
||||
|
||||
if _, found := a.ecInfo.remoteChunks[string(info.PublicKey())]; found {
|
||||
return nil
|
||||
}
|
||||
|
||||
nodeChunks := a.tryGetChunkListFromNode(ctx, info)
|
||||
|
||||
chunksGuard.Lock()
|
||||
defer chunksGuard.Unlock()
|
||||
for _, ch := range nodeChunks {
|
||||
chunks[string(ch.ID.GetValue())] = ch
|
||||
}
|
||||
return nil
|
||||
})
|
||||
}
|
||||
}
|
||||
if err = eg.Wait(); err != nil {
|
||||
return err
|
||||
}
|
||||
return createECInfoErr(chunks)
|
||||
}
|
||||
|
||||
func (a *assemblerec) retrieveParts(ctx context.Context, trav *placement.Traverser, cnr *container.Container) []*objectSDK.Object {
|
||||
dataCount := policy.ECDataCount(cnr.Value.PlacementPolicy())
|
||||
parityCount := policy.ECParityCount(cnr.Value.PlacementPolicy())
|
||||
|
@ -293,7 +237,7 @@ func (a *assemblerec) tryGetChunkFromLocalStorage(ctx context.Context, ch object
|
|||
return nil
|
||||
}
|
||||
var addr oid.Address
|
||||
addr.SetContainer(addr.Container())
|
||||
addr.SetContainer(a.addr.Container())
|
||||
addr.SetObject(objID)
|
||||
var object *objectSDK.Object
|
||||
if a.head {
|
||||
|
@ -359,11 +303,3 @@ func (a *assemblerec) tryGetChunkFromRemoteStorage(ctx context.Context, node cli
|
|||
}
|
||||
return object
|
||||
}
|
||||
|
||||
func createECInfoErr(chunks map[string]objectSDK.ECChunk) *objectSDK.ECInfoError {
|
||||
info := objectSDK.NewECInfo()
|
||||
for _, ch := range chunks {
|
||||
info.AddChunk(ch)
|
||||
}
|
||||
return objectSDK.NewECInfoError(info)
|
||||
}
|
||||
|
|
|
@ -111,6 +111,10 @@ func (exec *request) analyzeStatus(ctx context.Context, execCnr bool) {
|
|||
exec.log.Debug(logs.GetRequestedRangeIsOutOfObjectBounds)
|
||||
case statusEC:
|
||||
exec.log.Debug(logs.GetRequestedObjectIsEC)
|
||||
if exec.isRaw() && execCnr {
|
||||
exec.executeOnContainer(ctx)
|
||||
exec.analyzeStatus(ctx, false)
|
||||
}
|
||||
exec.assembleEC(ctx)
|
||||
default:
|
||||
exec.log.Debug(logs.OperationFinishedWithError,
|
||||
|
|
|
@ -730,7 +730,7 @@ func TestGetRemoteSmall(t *testing.T) {
|
|||
|
||||
t.Run("VIRTUAL", func(t *testing.T) {
|
||||
testHeadVirtual := func(svc *Service, addr oid.Address, i *objectSDK.SplitInfo) {
|
||||
headPrm := newHeadPrm(false, nil)
|
||||
headPrm := newHeadPrm(true, nil)
|
||||
headPrm.WithAddress(addr)
|
||||
|
||||
errSplit := objectSDK.NewSplitInfoError(objectSDK.NewSplitInfo())
|
||||
|
|
|
@ -35,8 +35,12 @@ func (r *request) processNode(ctx context.Context, info client.NodeInfo) bool {
|
|||
switch {
|
||||
default:
|
||||
r.log.Debug(logs.GetRemoteCallFailed, zap.Error(err))
|
||||
r.status = statusUndefined
|
||||
r.err = new(apistatus.ObjectNotFound)
|
||||
if r.status != statusEC {
|
||||
// for raw requests, continue to collect other parts
|
||||
r.status = statusUndefined
|
||||
r.err = new(apistatus.ObjectNotFound)
|
||||
}
|
||||
return false
|
||||
case err == nil:
|
||||
r.status = statusOK
|
||||
r.err = nil
|
||||
|
@ -48,22 +52,28 @@ func (r *request) processNode(ctx context.Context, info client.NodeInfo) bool {
|
|||
r.collectedObject = obj
|
||||
r.writeCollectedObject(ctx)
|
||||
}
|
||||
return true
|
||||
case errors.As(err, &errRemoved):
|
||||
r.status = statusINHUMED
|
||||
r.err = errRemoved
|
||||
return true
|
||||
case errors.As(err, &errOutOfRange):
|
||||
r.status = statusOutOfRange
|
||||
r.err = errOutOfRange
|
||||
return true
|
||||
case errors.As(err, &errSplitInfo):
|
||||
r.status = statusVIRTUAL
|
||||
mergeSplitInfo(r.splitInfo(), errSplitInfo.SplitInfo())
|
||||
r.err = objectSDK.NewSplitInfoError(r.infoSplit)
|
||||
return true
|
||||
case errors.As(err, &errECInfo):
|
||||
r.status = statusEC
|
||||
r.err = r.infoEC.addRemote(string(info.PublicKey()), errECInfo.ECInfo())
|
||||
if r.isRaw() {
|
||||
return false // continue to collect all parts
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
return r.status != statusUndefined
|
||||
}
|
||||
|
||||
func (r *request) getRemote(ctx context.Context, rs remoteStorage, info client.NodeInfo) (*objectSDK.Object, error) {
|
||||
|
|
|
@ -88,8 +88,8 @@ func (r *request) key() (*ecdsa.PrivateKey, error) {
|
|||
return r.keyStore.GetKey(sessionInfo)
|
||||
}
|
||||
|
||||
func (r *request) canAssemble() bool {
|
||||
return !r.isRaw() && !r.headOnly()
|
||||
func (r *request) canAssembleComplexObject() bool {
|
||||
return !r.isRaw()
|
||||
}
|
||||
|
||||
func (r *request) splitInfo() *objectSDK.SplitInfo {
|
||||
|
|
|
@ -17,6 +17,7 @@ import (
|
|||
containerSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
|
||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/erasurecode"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/transformer"
|
||||
"go.uber.org/zap"
|
||||
"golang.org/x/sync/errgroup"
|
||||
|
@ -39,7 +40,7 @@ type ecWriter struct {
|
|||
}
|
||||
|
||||
func (e *ecWriter) WriteObject(ctx context.Context, obj *objectSDK.Object) error {
|
||||
relayed, err := e.relayIfNotContainerNode(ctx)
|
||||
relayed, err := e.relayIfNotContainerNode(ctx, obj)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -65,7 +66,7 @@ func (e *ecWriter) WriteObject(ctx context.Context, obj *objectSDK.Object) error
|
|||
return e.writeRawObject(ctx, obj)
|
||||
}
|
||||
|
||||
func (e *ecWriter) relayIfNotContainerNode(ctx context.Context) (bool, error) {
|
||||
func (e *ecWriter) relayIfNotContainerNode(ctx context.Context, obj *objectSDK.Object) (bool, error) {
|
||||
if e.relay == nil {
|
||||
return false, nil
|
||||
}
|
||||
|
@ -77,7 +78,13 @@ func (e *ecWriter) relayIfNotContainerNode(ctx context.Context) (bool, error) {
|
|||
// object can be splitted or saved local
|
||||
return false, nil
|
||||
}
|
||||
if err := e.relayToContainerNode(ctx); err != nil {
|
||||
objID := object.AddressOf(obj).Object()
|
||||
var index uint32
|
||||
if obj.ECHeader() != nil {
|
||||
objID = obj.ECHeader().Parent()
|
||||
index = obj.ECHeader().Index()
|
||||
}
|
||||
if err := e.relayToContainerNode(ctx, objID, index); err != nil {
|
||||
return false, err
|
||||
}
|
||||
return true, nil
|
||||
|
@ -102,18 +109,20 @@ func (e *ecWriter) currentNodeIsContainerNode() (bool, error) {
|
|||
return false, nil
|
||||
}
|
||||
|
||||
func (e *ecWriter) relayToContainerNode(ctx context.Context) error {
|
||||
t, err := placement.NewTraverser(e.placementOpts...)
|
||||
func (e *ecWriter) relayToContainerNode(ctx context.Context, objID oid.ID, index uint32) error {
|
||||
t, err := placement.NewTraverser(append(e.placementOpts, placement.ForObject(objID))...)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
var lastErr error
|
||||
offset := int(index)
|
||||
for {
|
||||
nodes := t.Next()
|
||||
if len(nodes) == 0 {
|
||||
break
|
||||
}
|
||||
for _, node := range nodes {
|
||||
for idx := range nodes {
|
||||
node := nodes[(idx+offset)%len(nodes)]
|
||||
var info client.NodeInfo
|
||||
client.NodeInfoFromNetmapElement(&info, node)
|
||||
|
||||
|
@ -149,6 +158,10 @@ func (e *ecWriter) relayToContainerNode(ctx context.Context) error {
|
|||
}
|
||||
|
||||
func (e *ecWriter) writeECPart(ctx context.Context, obj *objectSDK.Object) error {
|
||||
if e.commonPrm.LocalOnly() {
|
||||
return e.writePartLocal(ctx, obj)
|
||||
}
|
||||
|
||||
t, err := placement.NewTraverser(append(e.placementOpts, placement.ForObject(obj.ECHeader().Parent()))...)
|
||||
if err != nil {
|
||||
return err
|
||||
|
|
|
@ -19,6 +19,7 @@ import (
|
|||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/policy"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/internal"
|
||||
svcutil "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
|
||||
|
@ -229,6 +230,9 @@ func (s *Service) getPutSinglePlacementOptions(obj *objectSDK.Object, copiesNumb
|
|||
if len(copiesNumber) > 0 && !result.isEC {
|
||||
result.placementOptions = append(result.placementOptions, placement.WithCopyNumbers(copiesNumber))
|
||||
}
|
||||
if container.IsECContainer(cnrInfo.Value) && !object.IsECSupported(obj) && !localOnly {
|
||||
result.placementOptions = append(result.placementOptions, placement.SuccessAfter(uint32(policy.ECParityCount(cnrInfo.Value.PlacementPolicy())+1)))
|
||||
}
|
||||
result.placementOptions = append(result.placementOptions, placement.ForContainer(cnrInfo.Value))
|
||||
|
||||
objID, ok := obj.ID()
|
||||
|
|
|
@ -10,6 +10,7 @@ import (
|
|||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/policy"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement"
|
||||
pkgutil "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util"
|
||||
|
@ -212,10 +213,10 @@ func (p *Streamer) newObjectWriter(prm *PutInitPrm) transformer.ObjectWriter {
|
|||
if container.IsECContainer(prm.cnr) && object.IsECSupported(prm.hdr) {
|
||||
return p.newECWriter(prm)
|
||||
}
|
||||
return p.newDefaultObjectWriter(prm)
|
||||
return p.newDefaultObjectWriter(prm, false)
|
||||
}
|
||||
|
||||
func (p *Streamer) newDefaultObjectWriter(prm *PutInitPrm) transformer.ObjectWriter {
|
||||
func (p *Streamer) newDefaultObjectWriter(prm *PutInitPrm, forECPlacement bool) transformer.ObjectWriter {
|
||||
var relay func(context.Context, nodeDesc) error
|
||||
if p.relay != nil {
|
||||
relay = func(ctx context.Context, node nodeDesc) error {
|
||||
|
@ -232,9 +233,16 @@ func (p *Streamer) newDefaultObjectWriter(prm *PutInitPrm) transformer.ObjectWri
|
|||
}
|
||||
}
|
||||
|
||||
traverseOpts := prm.traverseOpts
|
||||
if forECPlacement && !prm.common.LocalOnly() {
|
||||
// save non-regular and linking object to EC container.
|
||||
// EC 2.1 -> REP 2, EC 2.2 -> REP 3 etc.
|
||||
traverseOpts = append(traverseOpts, placement.SuccessAfter(uint32(policy.ECParityCount(prm.cnr.PlacementPolicy())+1)))
|
||||
}
|
||||
|
||||
return &distributedTarget{
|
||||
cfg: p.cfg,
|
||||
placementOpts: prm.traverseOpts,
|
||||
placementOpts: traverseOpts,
|
||||
nodeTargetInitializer: func(node nodeDesc) preparedObjectTarget {
|
||||
if node.local {
|
||||
return localTarget{
|
||||
|
@ -266,7 +274,7 @@ func (p *Streamer) newECWriter(prm *PutInitPrm) transformer.ObjectWriter {
|
|||
commonPrm: prm.common,
|
||||
relay: p.relay,
|
||||
},
|
||||
repWriter: p.newDefaultObjectWriter(prm),
|
||||
repWriter: p.newDefaultObjectWriter(prm, true),
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in a new issue