[#1451] ape: Perform strict APE checks for EC parts

Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
This commit is contained in:
Dmitrii Stepanov 2024-10-28 15:46:38 +03:00 committed by Evgenii Stratonikov
parent 9902965ff4
commit 5b1ba8e23d
2 changed files with 30 additions and 15 deletions

View file

@ -695,6 +695,7 @@ func TestPutECChunk(t *testing.T) {
nm := &netmapStub{ nm := &netmapStub{
currentEpoch: 100, currentEpoch: 100,
netmaps: map[uint64]*netmapSDK.NetMap{ netmaps: map[uint64]*netmapSDK.NetMap{
99: netmap,
100: netmap, 100: netmap,
}, },
} }

View file

@ -3,6 +3,7 @@ package ape
import ( import (
"context" "context"
"crypto/sha256" "crypto/sha256"
"errors"
"fmt" "fmt"
"net" "net"
"strconv" "strconv"
@ -11,6 +12,7 @@ import (
aperequest "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/ape/request" aperequest "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/ape/request"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/acl" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/acl"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
@ -24,6 +26,8 @@ import (
var defaultRequest = aperequest.Request{} var defaultRequest = aperequest.Request{}
var errECMissingParentObjectID = errors.New("missing EC parent object ID")
func nativeSchemaRole(role acl.Role) string { func nativeSchemaRole(role acl.Role) string {
switch role { switch role {
case acl.RoleOwner: case acl.RoleOwner:
@ -122,7 +126,10 @@ func (c *checkerImpl) newAPERequest(ctx context.Context, prm Prm) (aperequest.Re
header = headerObjSDK.ToV2().GetHeader() header = headerObjSDK.ToV2().GetHeader()
} }
} }
header = c.fillHeaderWithECParent(ctx, prm, header) header, err := c.fillHeaderWithECParent(ctx, prm, header)
if err != nil {
return defaultRequest, fmt.Errorf("get EC parent header: %w", err)
}
reqProps := map[string]string{ reqProps := map[string]string{
nativeschema.PropertyKeyActorPublicKey: prm.SenderKey, nativeschema.PropertyKeyActorPublicKey: prm.SenderKey,
nativeschema.PropertyKeyActorRole: prm.Role, nativeschema.PropertyKeyActorRole: prm.Role,
@ -133,7 +140,6 @@ func (c *checkerImpl) newAPERequest(ctx context.Context, prm Prm) (aperequest.Re
reqProps[xheadKey] = xhead.GetValue() reqProps[xheadKey] = xhead.GetValue()
} }
var err error
reqProps, err = c.fillWithUserClaimTags(reqProps, prm) reqProps, err = c.fillWithUserClaimTags(reqProps, prm)
if err != nil { if err != nil {
return defaultRequest, err return defaultRequest, err
@ -155,35 +161,43 @@ func (c *checkerImpl) newAPERequest(ctx context.Context, prm Prm) (aperequest.Re
), nil ), nil
} }
func (c *checkerImpl) fillHeaderWithECParent(ctx context.Context, prm Prm, header *objectV2.Header) *objectV2.Header { func (c *checkerImpl) fillHeaderWithECParent(ctx context.Context, prm Prm, header *objectV2.Header) (*objectV2.Header, error) {
if header == nil { if header == nil {
return header return header, nil
} }
if header.GetEC() == nil { if header.GetEC() == nil {
return header return header, nil
}
if prm.Role == nativeschema.PropertyValueContainerRoleContainer ||
prm.Role == nativeschema.PropertyValueContainerRoleIR {
return header
} }
parentObjRefID := header.GetEC().Parent parentObjRefID := header.GetEC().Parent
if parentObjRefID == nil { if parentObjRefID == nil {
return header return nil, errECMissingParentObjectID
} }
var parentObjID oid.ID var parentObjID oid.ID
if err := parentObjID.ReadFromV2(*parentObjRefID); err != nil { if err := parentObjID.ReadFromV2(*parentObjRefID); err != nil {
return header return nil, fmt.Errorf("EC parent object ID format error: %w", err)
} }
// only container node have access to collect parent object // only container node have access to collect parent object
contNode, err := c.currentNodeIsContainerNode(prm.Container) contNode, err := c.currentNodeIsContainerNode(prm.Container)
if err != nil || !contNode { if err != nil {
return header return nil, fmt.Errorf("check container node status: %w", err)
}
if !contNode {
return header, nil
} }
parentObj, err := c.headerProvider.GetHeader(ctx, prm.Container, parentObjID, false) parentObj, err := c.headerProvider.GetHeader(ctx, prm.Container, parentObjID, false)
if err != nil { if err != nil {
return header if isLogicalError(err) {
return header, nil
} }
return parentObj.ToV2().GetHeader() return nil, fmt.Errorf("EC parent header request: %w", err)
}
return parentObj.ToV2().GetHeader(), nil
}
func isLogicalError(err error) bool {
var errObjRemoved *apistatus.ObjectAlreadyRemoved
var errObjNotFound *apistatus.ObjectNotFound
return errors.As(err, &errObjRemoved) || errors.As(err, &errObjNotFound)
} }
func (c *checkerImpl) currentNodeIsContainerNode(cnrID cid.ID) (bool, error) { func (c *checkerImpl) currentNodeIsContainerNode(cnrID cid.ID) (bool, error) {