Strict APE check for EC & fix sign EC part put requests #1451
|
@ -67,9 +67,6 @@ type Prm struct {
|
|||
// If SoftAPECheck is set to true, then NoRuleFound is interpreted as allow.
|
||||
SoftAPECheck bool
|
||||
|
||||
// If true, object headers will not retrieved from storage engine.
|
||||
WithoutHeaderRequest bool
|
||||
|
||||
// The request's bearer token. It is used in order to check APE overrides with the token.
|
||||
BearerToken *bearer.Token
|
||||
|
||||
|
|
|
@ -695,6 +695,7 @@ func TestPutECChunk(t *testing.T) {
|
|||
nm := &netmapStub{
|
||||
currentEpoch: 100,
|
||||
netmaps: map[uint64]*netmapSDK.NetMap{
|
||||
99: netmap,
|
||||
|
||||
100: netmap,
|
||||
},
|
||||
}
|
||||
|
|
|
@ -3,6 +3,7 @@ package ape
|
|||
import (
|
||||
"context"
|
||||
"crypto/sha256"
|
||||
"errors"
|
||||
"fmt"
|
||||
"net"
|
||||
"strconv"
|
||||
|
@ -11,6 +12,7 @@ import (
|
|||
aperequest "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/ape/request"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
||||
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/acl"
|
||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||
|
@ -24,6 +26,8 @@ import (
|
|||
|
||||
var defaultRequest = aperequest.Request{}
|
||||
|
||||
var errECMissingParentObjectID = errors.New("missing EC parent object ID")
|
||||
|
||||
func nativeSchemaRole(role acl.Role) string {
|
||||
switch role {
|
||||
case acl.RoleOwner:
|
||||
|
@ -116,13 +120,16 @@ func (c *checkerImpl) newAPERequest(ctx context.Context, prm Prm) (aperequest.Re
|
|||
var header *objectV2.Header
|
||||
if prm.Header != nil {
|
||||
header = prm.Header
|
||||
} else if prm.Object != nil && !prm.WithoutHeaderRequest {
|
||||
} else if prm.Object != nil {
|
||||
headerObjSDK, err := c.headerProvider.GetHeader(ctx, prm.Container, *prm.Object, true)
|
||||
if err == nil {
|
||||
header = headerObjSDK.ToV2().GetHeader()
|
||||
}
|
||||
}
|
||||
dstepanov-yadro
commented
It looks even too strict if you look at the line above:
It looks even too strict if you look at the line above:
```
} else if prm.Object != nil {
headerObjSDK, err := c.headerProvider.GetHeader(ctx, prm.Container, *prm.Object, true)
if err == nil {
header = headerObjSDK.ToV2().GetHeader()
}
}
```
|
||||
header = c.fillHeaderWithECParent(ctx, prm, header)
|
||||
header, err := c.fillHeaderWithECParent(ctx, prm, header)
|
||||
if err != nil {
|
||||
return defaultRequest, fmt.Errorf("get EC parent header: %w", err)
|
||||
}
|
||||
reqProps := map[string]string{
|
||||
nativeschema.PropertyKeyActorPublicKey: prm.SenderKey,
|
||||
nativeschema.PropertyKeyActorRole: prm.Role,
|
||||
|
@ -133,7 +140,6 @@ func (c *checkerImpl) newAPERequest(ctx context.Context, prm Prm) (aperequest.Re
|
|||
reqProps[xheadKey] = xhead.GetValue()
|
||||
}
|
||||
|
||||
var err error
|
||||
reqProps, err = c.fillWithUserClaimTags(reqProps, prm)
|
||||
if err != nil {
|
||||
return defaultRequest, err
|
||||
|
@ -155,35 +161,43 @@ func (c *checkerImpl) newAPERequest(ctx context.Context, prm Prm) (aperequest.Re
|
|||
), nil
|
||||
}
|
||||
|
||||
func (c *checkerImpl) fillHeaderWithECParent(ctx context.Context, prm Prm, header *objectV2.Header) *objectV2.Header {
|
||||
func (c *checkerImpl) fillHeaderWithECParent(ctx context.Context, prm Prm, header *objectV2.Header) (*objectV2.Header, error) {
|
||||
fyrchik
commented
I do not completely understand this function. I do not completely understand this function.
In what circumstances do we return `header` and not parent header and, more importantly, why?
dstepanov-yadro
commented
1. if it is not EC chunk
2. if current node is IR or container node: APE skips check for inside-container requests; not sure about the same for IR, maybe should delete it
3. if parent object does not exist
4. if current node is not container node: in this case node has no permissions to collect EC object
fyrchik
commented
If ape skips checks for in-container requests, why do we call this function at all? If ape skips checks for in-container requests, why do we _call_ this function at all?
dstepanov-yadro
commented
Oh, mistake. It must be:
Then container node tries to collect headers if request is done be owner or other user. Oh, mistake. It must be:
```
2. if request is from IR or container node: APE skips check for inside-container requests; not sure about the same for IR, maybe
```
Then container node tries to collect headers if request is done be owner or other user.
fyrchik
commented
Again, if APE skips check, why do we call this function? Again, if APE skips check, why do we call this function?
It doesn't seem we signify the "skip" part with our return value in any way.
dstepanov-yadro
commented
Removed pt.2 as it is checker before this function call. Removed pt.2 as it is checker before this function call.
|
||||
if header == nil {
|
||||
return header
|
||||
return header, nil
|
||||
}
|
||||
if header.GetEC() == nil {
|
||||
return header
|
||||
}
|
||||
if prm.Role == nativeschema.PropertyValueContainerRoleContainer ||
|
||||
prm.Role == nativeschema.PropertyValueContainerRoleIR {
|
||||
return header
|
||||
return header, nil
|
||||
}
|
||||
parentObjRefID := header.GetEC().Parent
|
||||
if parentObjRefID == nil {
|
||||
return header
|
||||
return nil, errECMissingParentObjectID
|
||||
}
|
||||
var parentObjID oid.ID
|
||||
if err := parentObjID.ReadFromV2(*parentObjRefID); err != nil {
|
||||
return header
|
||||
return nil, fmt.Errorf("EC parent object ID format error: %w", err)
|
||||
}
|
||||
// only container node have access to collect parent object
|
||||
contNode, err := c.currentNodeIsContainerNode(prm.Container)
|
||||
if err != nil || !contNode {
|
||||
return header
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("check container node status: %w", err)
|
||||
}
|
||||
if !contNode {
|
||||
return header, nil
|
||||
}
|
||||
parentObj, err := c.headerProvider.GetHeader(ctx, prm.Container, parentObjID, false)
|
||||
if err != nil {
|
||||
return header
|
||||
if isLogicalError(err) {
|
||||
return header, nil
|
||||
}
|
||||
return parentObj.ToV2().GetHeader()
|
||||
return nil, fmt.Errorf("EC parent header request: %w", err)
|
||||
}
|
||||
return parentObj.ToV2().GetHeader(), nil
|
||||
}
|
||||
|
||||
func isLogicalError(err error) bool {
|
||||
var errObjRemoved *apistatus.ObjectAlreadyRemoved
|
||||
var errObjNotFound *apistatus.ObjectNotFound
|
||||
return errors.As(err, &errObjRemoved) || errors.As(err, &errObjNotFound)
|
||||
}
|
||||
|
||||
func (c *checkerImpl) currentNodeIsContainerNode(cnrID cid.ID) (bool, error) {
|
||||
|
|
|
@ -25,7 +25,10 @@ import (
|
|||
|
||||
var _ transformer.ObjectWriter = (*ECWriter)(nil)
|
||||
|
||||
var errUnsupportedECObject = errors.New("object is not supported for erasure coding")
|
||||
var (
|
||||
errUnsupportedECObject = errors.New("object is not supported for erasure coding")
|
||||
errFailedToSaveAllECParts = errors.New("failed to save all EC parts")
|
||||
)
|
||||
|
||||
type ECWriter struct {
|
||||
Config *Config
|
||||
|
@ -37,10 +40,12 @@ type ECWriter struct {
|
|||
|
||||
ObjectMeta object.ContentMeta
|
||||
ObjectMetaValid bool
|
||||
|
||||
remoteRequestSignKey *ecdsa.PrivateKey
|
||||
}
|
||||
|
||||
func (e *ECWriter) WriteObject(ctx context.Context, obj *objectSDK.Object) error {
|
||||
relayed, err := e.relayIfNotContainerNode(ctx, obj)
|
||||
relayed, isContainerNode, err := e.relayIfNotContainerNode(ctx, obj)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -60,23 +65,35 @@ func (e *ECWriter) WriteObject(ctx context.Context, obj *objectSDK.Object) error
|
|||
e.ObjectMetaValid = true
|
||||
}
|
||||
dstepanov-yadro
commented
Restore tokens as in case of complex object use session token to store linking object (as it was before) Restore tokens as in case of complex object use session token to store linking object (as it was before)
|
||||
|
||||
if isContainerNode {
|
||||
restoreTokens := e.CommonPrm.ForgetTokens()
|
||||
defer restoreTokens()
|
||||
// As request executed on container node, so sign request with container key.
|
||||
e.remoteRequestSignKey, err = e.Config.KeyStorage.GetKey(nil)
|
||||
aarifullin
commented
Let's consider a situation: Let's consider a situation:
`ec-container: s01, s03, s04`. We're sending a request signed by `wallet.json`. We pass through APE-check for `s01`, _resign_ the request by `s01` key and send `writeECPart` to `s03, s04`.
Imagine that at this time interval (30sec before) a local override has been added to `s03` or `s04` to restrict putting object to this container - the local override is totally ignored due to this actor role change.
I don't consider a situation when an ape chain is present within `policy` contract because it'd reject the request before this point and that's OK
dstepanov-yadro
commented
I think it is ok as frostfs-storage is not strongly consistent system. I think it is ok as frostfs-storage is not strongly consistent system.
Also it depends on local override: if local override restricts put objects with owner, it will still work.
aarifullin
commented
It seems to be so
We won't get to the check at all: look at these lines Anyway, currently I am not requesting to change this. We just need to keep this in mind > I think it is ok as frostfs-storage is not strongly consistent system.
It seems to be so
> if local override restricts put objects with owner, it will still work
We won't get to the check at all: [look at these lines](https://git.frostfs.info/TrueCloudLab/frostfs-node/src/branch/master/pkg/services/object/ape/checker.go#L85-L86)
Anyway, currently I am not requesting to change this. We just need to keep this in mind
dstepanov-yadro
commented
Oh, right. Oh, right.
But as we discussed with @fyrchik we should try to check user's request once (on first container node now) to reduce unnecessary operations count.
aarifullin
commented
OK. I'll keep this comment open to not forget about this point in the future if we refer back to this PR OK. I'll keep this comment open to not forget about this point in the future if we refer back to this PR
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
} else {
|
||||
e.remoteRequestSignKey = e.Key
|
||||
}
|
||||
|
||||
if obj.ECHeader() != nil {
|
||||
return e.writeECPart(ctx, obj)
|
||||
}
|
||||
return e.writeRawObject(ctx, obj)
|
||||
}
|
||||
|
||||
func (e *ECWriter) relayIfNotContainerNode(ctx context.Context, obj *objectSDK.Object) (bool, error) {
|
||||
if e.Relay == nil {
|
||||
return false, nil
|
||||
}
|
||||
func (e *ECWriter) relayIfNotContainerNode(ctx context.Context, obj *objectSDK.Object) (bool, bool, error) {
|
||||
currentNodeIsContainerNode, err := e.currentNodeIsContainerNode()
|
||||
if err != nil {
|
||||
return false, err
|
||||
return false, false, err
|
||||
}
|
||||
if currentNodeIsContainerNode {
|
||||
// object can be splitted or saved local
|
||||
return false, nil
|
||||
return false, true, nil
|
||||
}
|
||||
if e.Relay == nil {
|
||||
return false, currentNodeIsContainerNode, nil
|
||||
}
|
||||
objID := object.AddressOf(obj).Object()
|
||||
var index uint32
|
||||
|
@ -85,9 +102,9 @@ func (e *ECWriter) relayIfNotContainerNode(ctx context.Context, obj *objectSDK.O
|
|||
index = obj.ECHeader().Index()
|
||||
}
|
||||
if err := e.relayToContainerNode(ctx, objID, index); err != nil {
|
||||
return false, err
|
||||
return false, false, err
|
||||
}
|
||||
return true, nil
|
||||
return true, currentNodeIsContainerNode, nil
|
||||
}
|
||||
|
||||
func (e *ECWriter) currentNodeIsContainerNode() (bool, error) {
|
||||
|
@ -235,6 +252,13 @@ func (e *ECWriter) writeRawObject(ctx context.Context, obj *objectSDK.Object) er
|
|||
singleErr: err,
|
||||
}
|
||||
}
|
||||
for idx := range partsProcessed {
|
||||
if !partsProcessed[idx].Load() {
|
||||
return errIncompletePut{
|
||||
singleErr: errFailedToSaveAllECParts,
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -338,7 +362,7 @@ func (e *ECWriter) writePartRemote(ctx context.Context, obj *objectSDK.Object, n
|
|||
client.NodeInfoFromNetmapElement(&clientNodeInfo, node)
|
||||
|
||||
remoteTaget := remoteWriter{
|
||||
privateKey: e.Key,
|
||||
privateKey: e.remoteRequestSignKey,
|
||||
clientConstructor: e.Config.ClientConstructor,
|
||||
commonPrm: e.CommonPrm,
|
||||
nodeInfo: clientNodeInfo,
|
||||
|
|
|
@ -14,6 +14,7 @@ import (
|
|||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
|
||||
netmapcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/checksum"
|
||||
|
@ -127,6 +128,8 @@ func TestECWriter(t *testing.T) {
|
|||
|
||||
ownerKey, err := keys.NewPrivateKey()
|
||||
require.NoError(t, err)
|
||||
nodeKey, err := keys.NewPrivateKey()
|
||||
require.NoError(t, err)
|
||||
|
||||
pool, err := ants.NewPool(4, ants.WithNonblocking(true))
|
||||
require.NoError(t, err)
|
||||
|
@ -141,6 +144,7 @@ func TestECWriter(t *testing.T) {
|
|||
RemotePool: pool,
|
||||
Logger: log,
|
||||
ClientConstructor: clientConstructor{vectors: ns},
|
||||
KeyStorage: util.NewKeyStorage(&nodeKey.PrivateKey, nil, nil),
|
||||
},
|
||||
PlacementOpts: append(
|
||||
[]placement.Option{placement.UseBuilder(builder), placement.ForContainer(cnr)},
|
||||
|
|
|
@ -100,11 +100,18 @@ func (p *CommonPrm) SetNetmapLookupDepth(v uint64) {
|
|||
|
||||
// ForgetTokens forgets all the tokens read from the request's
|
||||
// meta information before.
|
||||
func (p *CommonPrm) ForgetTokens() {
|
||||
func (p *CommonPrm) ForgetTokens() func() {
|
||||
if p != nil {
|
||||
tk := p.token
|
||||
br := p.bearer
|
||||
p.token = nil
|
||||
p.bearer = nil
|
||||
return func() {
|
||||
p.token = tk
|
||||
p.bearer = br
|
||||
}
|
||||
}
|
||||
return func() {}
|
||||
}
|
||||
|
||||
func CommonPrmFromV2(req interface {
|
||||
|
|
|
@ -3,6 +3,7 @@ package placement
|
|||
import (
|
||||
"crypto/sha256"
|
||||
"fmt"
|
||||
"slices"
|
||||
"sync"
|
||||
|
||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||
|
@ -44,7 +45,7 @@ func (c *ContainerNodesCache) ContainerNodes(nm *netmapSDK.NetMap, cnr cid.ID, p
|
|||
raw, ok := c.containerCache.Get(cnr)
|
||||
c.mtx.Unlock()
|
||||
if ok {
|
||||
return raw, nil
|
||||
return c.cloneResult(raw), nil
|
||||
}
|
||||
} else {
|
||||
c.lastEpoch = nm.Epoch()
|
||||
|
@ -65,5 +66,13 @@ func (c *ContainerNodesCache) ContainerNodes(nm *netmapSDK.NetMap, cnr cid.ID, p
|
|||
c.containerCache.Add(cnr, cn)
|
||||
}
|
||||
c.mtx.Unlock()
|
||||
return cn, nil
|
||||
return c.cloneResult(cn), nil
|
||||
}
|
||||
|
||||
func (c *ContainerNodesCache) cloneResult(nodes [][]netmapSDK.NodeInfo) [][]netmapSDK.NodeInfo {
|
||||
result := make([][]netmapSDK.NodeInfo, len(nodes))
|
||||
for repIdx := range nodes {
|
||||
fyrchik
commented
`slices.Clone`?
dstepanov-yadro
commented
fixed fixed
|
||||
result[repIdx] = slices.Clone(nodes[repIdx])
|
||||
}
|
||||
return result
|
||||
}
|
||||
|
|
Why this change is necessary?
previously, checking that the node is a container node ignored the error that netmap is not defined for previous epoch