Strict APE check for EC & fix sign EC part put requests #1451

Merged
fyrchik merged 5 commits from dstepanov-yadro/frostfs-node:fix/ec_ape_strict into master 2024-11-06 08:18:11 +00:00
7 changed files with 89 additions and 33 deletions

View file

@ -67,9 +67,6 @@ type Prm struct {
// If SoftAPECheck is set to true, then NoRuleFound is interpreted as allow. // If SoftAPECheck is set to true, then NoRuleFound is interpreted as allow.
SoftAPECheck bool SoftAPECheck bool
// If true, object headers will not retrieved from storage engine.
WithoutHeaderRequest bool
// The request's bearer token. It is used in order to check APE overrides with the token. // The request's bearer token. It is used in order to check APE overrides with the token.
BearerToken *bearer.Token BearerToken *bearer.Token

View file

@ -695,6 +695,7 @@ func TestPutECChunk(t *testing.T) {
nm := &netmapStub{ nm := &netmapStub{
currentEpoch: 100, currentEpoch: 100,
netmaps: map[uint64]*netmapSDK.NetMap{ netmaps: map[uint64]*netmapSDK.NetMap{
99: netmap,
100: netmap, 100: netmap,
}, },
} }

View file

@ -3,6 +3,7 @@ package ape
import ( import (
"context" "context"
"crypto/sha256" "crypto/sha256"
"errors"
"fmt" "fmt"
"net" "net"
"strconv" "strconv"
@ -11,6 +12,7 @@ import (
aperequest "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/ape/request" aperequest "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/ape/request"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/acl" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/acl"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
@ -24,6 +26,8 @@ import (
var defaultRequest = aperequest.Request{} var defaultRequest = aperequest.Request{}
var errECMissingParentObjectID = errors.New("missing EC parent object ID")
func nativeSchemaRole(role acl.Role) string { func nativeSchemaRole(role acl.Role) string {
switch role { switch role {
case acl.RoleOwner: case acl.RoleOwner:
@ -116,13 +120,16 @@ func (c *checkerImpl) newAPERequest(ctx context.Context, prm Prm) (aperequest.Re
var header *objectV2.Header var header *objectV2.Header
if prm.Header != nil { if prm.Header != nil {
header = prm.Header header = prm.Header
} else if prm.Object != nil && !prm.WithoutHeaderRequest { } else if prm.Object != nil {
headerObjSDK, err := c.headerProvider.GetHeader(ctx, prm.Container, *prm.Object, true) headerObjSDK, err := c.headerProvider.GetHeader(ctx, prm.Container, *prm.Object, true)
if err == nil { if err == nil {
header = headerObjSDK.ToV2().GetHeader() header = headerObjSDK.ToV2().GetHeader()
} }
} }
header = c.fillHeaderWithECParent(ctx, prm, header) header, err := c.fillHeaderWithECParent(ctx, prm, header)
if err != nil {
return defaultRequest, fmt.Errorf("get EC parent header: %w", err)
}
reqProps := map[string]string{ reqProps := map[string]string{
nativeschema.PropertyKeyActorPublicKey: prm.SenderKey, nativeschema.PropertyKeyActorPublicKey: prm.SenderKey,
nativeschema.PropertyKeyActorRole: prm.Role, nativeschema.PropertyKeyActorRole: prm.Role,
@ -133,7 +140,6 @@ func (c *checkerImpl) newAPERequest(ctx context.Context, prm Prm) (aperequest.Re
reqProps[xheadKey] = xhead.GetValue() reqProps[xheadKey] = xhead.GetValue()
} }
var err error
reqProps, err = c.fillWithUserClaimTags(reqProps, prm) reqProps, err = c.fillWithUserClaimTags(reqProps, prm)
if err != nil { if err != nil {
return defaultRequest, err return defaultRequest, err
@ -155,35 +161,43 @@ func (c *checkerImpl) newAPERequest(ctx context.Context, prm Prm) (aperequest.Re
), nil ), nil
} }
func (c *checkerImpl) fillHeaderWithECParent(ctx context.Context, prm Prm, header *objectV2.Header) *objectV2.Header { func (c *checkerImpl) fillHeaderWithECParent(ctx context.Context, prm Prm, header *objectV2.Header) (*objectV2.Header, error) {
if header == nil { if header == nil {
return header return header, nil
} }
if header.GetEC() == nil { if header.GetEC() == nil {
return header return header, nil
}
if prm.Role == nativeschema.PropertyValueContainerRoleContainer ||
prm.Role == nativeschema.PropertyValueContainerRoleIR {
return header
} }
parentObjRefID := header.GetEC().Parent parentObjRefID := header.GetEC().Parent
if parentObjRefID == nil { if parentObjRefID == nil {
return header return nil, errECMissingParentObjectID
} }
var parentObjID oid.ID var parentObjID oid.ID
if err := parentObjID.ReadFromV2(*parentObjRefID); err != nil { if err := parentObjID.ReadFromV2(*parentObjRefID); err != nil {
return header return nil, fmt.Errorf("EC parent object ID format error: %w", err)
} }
// only container node have access to collect parent object // only container node have access to collect parent object
contNode, err := c.currentNodeIsContainerNode(prm.Container) contNode, err := c.currentNodeIsContainerNode(prm.Container)
if err != nil || !contNode { if err != nil {
return header return nil, fmt.Errorf("check container node status: %w", err)
}
if !contNode {
return header, nil
} }
parentObj, err := c.headerProvider.GetHeader(ctx, prm.Container, parentObjID, false) parentObj, err := c.headerProvider.GetHeader(ctx, prm.Container, parentObjID, false)
if err != nil { if err != nil {
return header if isLogicalError(err) {
return header, nil
} }
return parentObj.ToV2().GetHeader() return nil, fmt.Errorf("EC parent header request: %w", err)
}
return parentObj.ToV2().GetHeader(), nil
}
func isLogicalError(err error) bool {
var errObjRemoved *apistatus.ObjectAlreadyRemoved
var errObjNotFound *apistatus.ObjectNotFound
return errors.As(err, &errObjRemoved) || errors.As(err, &errObjNotFound)
} }
func (c *checkerImpl) currentNodeIsContainerNode(cnrID cid.ID) (bool, error) { func (c *checkerImpl) currentNodeIsContainerNode(cnrID cid.ID) (bool, error) {

View file

@ -25,7 +25,10 @@ import (
var _ transformer.ObjectWriter = (*ECWriter)(nil) var _ transformer.ObjectWriter = (*ECWriter)(nil)
var errUnsupportedECObject = errors.New("object is not supported for erasure coding") var (
errUnsupportedECObject = errors.New("object is not supported for erasure coding")
errFailedToSaveAllECParts = errors.New("failed to save all EC parts")
)
type ECWriter struct { type ECWriter struct {
Config *Config Config *Config
@ -37,10 +40,12 @@ type ECWriter struct {
ObjectMeta object.ContentMeta ObjectMeta object.ContentMeta
ObjectMetaValid bool ObjectMetaValid bool
remoteRequestSignKey *ecdsa.PrivateKey
} }
func (e *ECWriter) WriteObject(ctx context.Context, obj *objectSDK.Object) error { func (e *ECWriter) WriteObject(ctx context.Context, obj *objectSDK.Object) error {
relayed, err := e.relayIfNotContainerNode(ctx, obj) relayed, isContainerNode, err := e.relayIfNotContainerNode(ctx, obj)
if err != nil { if err != nil {
return err return err
} }
@ -60,23 +65,35 @@ func (e *ECWriter) WriteObject(ctx context.Context, obj *objectSDK.Object) error
e.ObjectMetaValid = true e.ObjectMetaValid = true
} }
if isContainerNode {
restoreTokens := e.CommonPrm.ForgetTokens()
defer restoreTokens()
// As request executed on container node, so sign request with container key.
e.remoteRequestSignKey, err = e.Config.KeyStorage.GetKey(nil)
if err != nil {
return err
}
} else {
e.remoteRequestSignKey = e.Key
}
if obj.ECHeader() != nil { if obj.ECHeader() != nil {
return e.writeECPart(ctx, obj) return e.writeECPart(ctx, obj)
} }
return e.writeRawObject(ctx, obj) return e.writeRawObject(ctx, obj)
} }
func (e *ECWriter) relayIfNotContainerNode(ctx context.Context, obj *objectSDK.Object) (bool, error) { func (e *ECWriter) relayIfNotContainerNode(ctx context.Context, obj *objectSDK.Object) (bool, bool, error) {
if e.Relay == nil {
return false, nil
}
currentNodeIsContainerNode, err := e.currentNodeIsContainerNode() currentNodeIsContainerNode, err := e.currentNodeIsContainerNode()
if err != nil { if err != nil {
return false, err return false, false, err
} }
if currentNodeIsContainerNode { if currentNodeIsContainerNode {
// object can be splitted or saved local // object can be splitted or saved local
return false, nil return false, true, nil
}
if e.Relay == nil {
return false, currentNodeIsContainerNode, nil
} }
objID := object.AddressOf(obj).Object() objID := object.AddressOf(obj).Object()
var index uint32 var index uint32
@ -85,9 +102,9 @@ func (e *ECWriter) relayIfNotContainerNode(ctx context.Context, obj *objectSDK.O
index = obj.ECHeader().Index() index = obj.ECHeader().Index()
} }
if err := e.relayToContainerNode(ctx, objID, index); err != nil { if err := e.relayToContainerNode(ctx, objID, index); err != nil {
return false, err return false, false, err
} }
return true, nil return true, currentNodeIsContainerNode, nil
} }
func (e *ECWriter) currentNodeIsContainerNode() (bool, error) { func (e *ECWriter) currentNodeIsContainerNode() (bool, error) {
@ -235,6 +252,13 @@ func (e *ECWriter) writeRawObject(ctx context.Context, obj *objectSDK.Object) er
singleErr: err, singleErr: err,
} }
} }
for idx := range partsProcessed {
if !partsProcessed[idx].Load() {
return errIncompletePut{
singleErr: errFailedToSaveAllECParts,
}
}
}
return nil return nil
} }
@ -338,7 +362,7 @@ func (e *ECWriter) writePartRemote(ctx context.Context, obj *objectSDK.Object, n
client.NodeInfoFromNetmapElement(&clientNodeInfo, node) client.NodeInfoFromNetmapElement(&clientNodeInfo, node)
remoteTaget := remoteWriter{ remoteTaget := remoteWriter{
privateKey: e.Key, privateKey: e.remoteRequestSignKey,
clientConstructor: e.Config.ClientConstructor, clientConstructor: e.Config.ClientConstructor,
commonPrm: e.CommonPrm, commonPrm: e.CommonPrm,
nodeInfo: clientNodeInfo, nodeInfo: clientNodeInfo,

View file

@ -14,6 +14,7 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
netmapcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap" netmapcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/checksum" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/checksum"
@ -127,6 +128,8 @@ func TestECWriter(t *testing.T) {
ownerKey, err := keys.NewPrivateKey() ownerKey, err := keys.NewPrivateKey()
require.NoError(t, err) require.NoError(t, err)
nodeKey, err := keys.NewPrivateKey()
require.NoError(t, err)
pool, err := ants.NewPool(4, ants.WithNonblocking(true)) pool, err := ants.NewPool(4, ants.WithNonblocking(true))
require.NoError(t, err) require.NoError(t, err)
@ -141,6 +144,7 @@ func TestECWriter(t *testing.T) {
RemotePool: pool, RemotePool: pool,
Logger: log, Logger: log,
ClientConstructor: clientConstructor{vectors: ns}, ClientConstructor: clientConstructor{vectors: ns},
KeyStorage: util.NewKeyStorage(&nodeKey.PrivateKey, nil, nil),
}, },
PlacementOpts: append( PlacementOpts: append(
[]placement.Option{placement.UseBuilder(builder), placement.ForContainer(cnr)}, []placement.Option{placement.UseBuilder(builder), placement.ForContainer(cnr)},

View file

@ -100,11 +100,18 @@ func (p *CommonPrm) SetNetmapLookupDepth(v uint64) {
// ForgetTokens forgets all the tokens read from the request's // ForgetTokens forgets all the tokens read from the request's
// meta information before. // meta information before.
func (p *CommonPrm) ForgetTokens() { func (p *CommonPrm) ForgetTokens() func() {
if p != nil { if p != nil {
tk := p.token
br := p.bearer
p.token = nil p.token = nil
p.bearer = nil p.bearer = nil
return func() {
p.token = tk
p.bearer = br
} }
}
return func() {}
} }
func CommonPrmFromV2(req interface { func CommonPrmFromV2(req interface {

View file

@ -3,6 +3,7 @@ package placement
import ( import (
"crypto/sha256" "crypto/sha256"
"fmt" "fmt"
"slices"
"sync" "sync"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
@ -44,7 +45,7 @@ func (c *ContainerNodesCache) ContainerNodes(nm *netmapSDK.NetMap, cnr cid.ID, p
raw, ok := c.containerCache.Get(cnr) raw, ok := c.containerCache.Get(cnr)
c.mtx.Unlock() c.mtx.Unlock()
if ok { if ok {
return raw, nil return c.cloneResult(raw), nil
} }
} else { } else {
c.lastEpoch = nm.Epoch() c.lastEpoch = nm.Epoch()
@ -65,5 +66,13 @@ func (c *ContainerNodesCache) ContainerNodes(nm *netmapSDK.NetMap, cnr cid.ID, p
c.containerCache.Add(cnr, cn) c.containerCache.Add(cnr, cn)
} }
c.mtx.Unlock() c.mtx.Unlock()
return cn, nil return c.cloneResult(cn), nil
}
func (c *ContainerNodesCache) cloneResult(nodes [][]netmapSDK.NodeInfo) [][]netmapSDK.NodeInfo {
result := make([][]netmapSDK.NodeInfo, len(nodes))
for repIdx := range nodes {
result[repIdx] = slices.Clone(nodes[repIdx])
}
return result
} }