Strict APE check for EC & fix sign EC part put requests #1451
7 changed files with 89 additions and 33 deletions
|
@ -67,9 +67,6 @@ type Prm struct {
|
||||||
// If SoftAPECheck is set to true, then NoRuleFound is interpreted as allow.
|
// If SoftAPECheck is set to true, then NoRuleFound is interpreted as allow.
|
||||||
SoftAPECheck bool
|
SoftAPECheck bool
|
||||||
|
|
||||||
// If true, object headers will not retrieved from storage engine.
|
|
||||||
WithoutHeaderRequest bool
|
|
||||||
|
|
||||||
// The request's bearer token. It is used in order to check APE overrides with the token.
|
// The request's bearer token. It is used in order to check APE overrides with the token.
|
||||||
BearerToken *bearer.Token
|
BearerToken *bearer.Token
|
||||||
|
|
||||||
|
|
|
@ -695,6 +695,7 @@ func TestPutECChunk(t *testing.T) {
|
||||||
nm := &netmapStub{
|
nm := &netmapStub{
|
||||||
currentEpoch: 100,
|
currentEpoch: 100,
|
||||||
netmaps: map[uint64]*netmapSDK.NetMap{
|
netmaps: map[uint64]*netmapSDK.NetMap{
|
||||||
|
99: netmap,
|
||||||
100: netmap,
|
100: netmap,
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
|
@ -3,6 +3,7 @@ package ape
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"crypto/sha256"
|
"crypto/sha256"
|
||||||
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"net"
|
"net"
|
||||||
"strconv"
|
"strconv"
|
||||||
|
@ -11,6 +12,7 @@ import (
|
||||||
aperequest "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/ape/request"
|
aperequest "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/ape/request"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
||||||
|
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/acl"
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/acl"
|
||||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||||
|
@ -24,6 +26,8 @@ import (
|
||||||
|
|
||||||
var defaultRequest = aperequest.Request{}
|
var defaultRequest = aperequest.Request{}
|
||||||
|
|
||||||
|
var errECMissingParentObjectID = errors.New("missing EC parent object ID")
|
||||||
|
|
||||||
func nativeSchemaRole(role acl.Role) string {
|
func nativeSchemaRole(role acl.Role) string {
|
||||||
switch role {
|
switch role {
|
||||||
case acl.RoleOwner:
|
case acl.RoleOwner:
|
||||||
|
@ -116,13 +120,16 @@ func (c *checkerImpl) newAPERequest(ctx context.Context, prm Prm) (aperequest.Re
|
||||||
var header *objectV2.Header
|
var header *objectV2.Header
|
||||||
if prm.Header != nil {
|
if prm.Header != nil {
|
||||||
header = prm.Header
|
header = prm.Header
|
||||||
} else if prm.Object != nil && !prm.WithoutHeaderRequest {
|
} else if prm.Object != nil {
|
||||||
headerObjSDK, err := c.headerProvider.GetHeader(ctx, prm.Container, *prm.Object, true)
|
headerObjSDK, err := c.headerProvider.GetHeader(ctx, prm.Container, *prm.Object, true)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
header = headerObjSDK.ToV2().GetHeader()
|
header = headerObjSDK.ToV2().GetHeader()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
header = c.fillHeaderWithECParent(ctx, prm, header)
|
header, err := c.fillHeaderWithECParent(ctx, prm, header)
|
||||||
|
if err != nil {
|
||||||
|
return defaultRequest, fmt.Errorf("get EC parent header: %w", err)
|
||||||
|
}
|
||||||
reqProps := map[string]string{
|
reqProps := map[string]string{
|
||||||
nativeschema.PropertyKeyActorPublicKey: prm.SenderKey,
|
nativeschema.PropertyKeyActorPublicKey: prm.SenderKey,
|
||||||
nativeschema.PropertyKeyActorRole: prm.Role,
|
nativeschema.PropertyKeyActorRole: prm.Role,
|
||||||
|
@ -133,7 +140,6 @@ func (c *checkerImpl) newAPERequest(ctx context.Context, prm Prm) (aperequest.Re
|
||||||
reqProps[xheadKey] = xhead.GetValue()
|
reqProps[xheadKey] = xhead.GetValue()
|
||||||
}
|
}
|
||||||
|
|
||||||
var err error
|
|
||||||
reqProps, err = c.fillWithUserClaimTags(reqProps, prm)
|
reqProps, err = c.fillWithUserClaimTags(reqProps, prm)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return defaultRequest, err
|
return defaultRequest, err
|
||||||
|
@ -155,35 +161,43 @@ func (c *checkerImpl) newAPERequest(ctx context.Context, prm Prm) (aperequest.Re
|
||||||
), nil
|
), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *checkerImpl) fillHeaderWithECParent(ctx context.Context, prm Prm, header *objectV2.Header) *objectV2.Header {
|
func (c *checkerImpl) fillHeaderWithECParent(ctx context.Context, prm Prm, header *objectV2.Header) (*objectV2.Header, error) {
|
||||||
if header == nil {
|
if header == nil {
|
||||||
return header
|
return header, nil
|
||||||
}
|
}
|
||||||
if header.GetEC() == nil {
|
if header.GetEC() == nil {
|
||||||
return header
|
return header, nil
|
||||||
}
|
|
||||||
if prm.Role == nativeschema.PropertyValueContainerRoleContainer ||
|
|
||||||
prm.Role == nativeschema.PropertyValueContainerRoleIR {
|
|
||||||
return header
|
|
||||||
}
|
}
|
||||||
parentObjRefID := header.GetEC().Parent
|
parentObjRefID := header.GetEC().Parent
|
||||||
if parentObjRefID == nil {
|
if parentObjRefID == nil {
|
||||||
return header
|
return nil, errECMissingParentObjectID
|
||||||
}
|
}
|
||||||
var parentObjID oid.ID
|
var parentObjID oid.ID
|
||||||
if err := parentObjID.ReadFromV2(*parentObjRefID); err != nil {
|
if err := parentObjID.ReadFromV2(*parentObjRefID); err != nil {
|
||||||
return header
|
return nil, fmt.Errorf("EC parent object ID format error: %w", err)
|
||||||
}
|
}
|
||||||
// only container node have access to collect parent object
|
// only container node have access to collect parent object
|
||||||
contNode, err := c.currentNodeIsContainerNode(prm.Container)
|
contNode, err := c.currentNodeIsContainerNode(prm.Container)
|
||||||
if err != nil || !contNode {
|
if err != nil {
|
||||||
return header
|
return nil, fmt.Errorf("check container node status: %w", err)
|
||||||
|
}
|
||||||
|
if !contNode {
|
||||||
|
return header, nil
|
||||||
}
|
}
|
||||||
parentObj, err := c.headerProvider.GetHeader(ctx, prm.Container, parentObjID, false)
|
parentObj, err := c.headerProvider.GetHeader(ctx, prm.Container, parentObjID, false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return header
|
if isLogicalError(err) {
|
||||||
|
return header, nil
|
||||||
}
|
}
|
||||||
return parentObj.ToV2().GetHeader()
|
return nil, fmt.Errorf("EC parent header request: %w", err)
|
||||||
|
}
|
||||||
|
return parentObj.ToV2().GetHeader(), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func isLogicalError(err error) bool {
|
||||||
|
var errObjRemoved *apistatus.ObjectAlreadyRemoved
|
||||||
|
var errObjNotFound *apistatus.ObjectNotFound
|
||||||
|
return errors.As(err, &errObjRemoved) || errors.As(err, &errObjNotFound)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *checkerImpl) currentNodeIsContainerNode(cnrID cid.ID) (bool, error) {
|
func (c *checkerImpl) currentNodeIsContainerNode(cnrID cid.ID) (bool, error) {
|
||||||
|
|
|
@ -25,7 +25,10 @@ import (
|
||||||
|
|
||||||
var _ transformer.ObjectWriter = (*ECWriter)(nil)
|
var _ transformer.ObjectWriter = (*ECWriter)(nil)
|
||||||
|
|
||||||
var errUnsupportedECObject = errors.New("object is not supported for erasure coding")
|
var (
|
||||||
|
errUnsupportedECObject = errors.New("object is not supported for erasure coding")
|
||||||
|
errFailedToSaveAllECParts = errors.New("failed to save all EC parts")
|
||||||
|
)
|
||||||
|
|
||||||
type ECWriter struct {
|
type ECWriter struct {
|
||||||
Config *Config
|
Config *Config
|
||||||
|
@ -37,10 +40,12 @@ type ECWriter struct {
|
||||||
|
|
||||||
ObjectMeta object.ContentMeta
|
ObjectMeta object.ContentMeta
|
||||||
ObjectMetaValid bool
|
ObjectMetaValid bool
|
||||||
|
|
||||||
|
remoteRequestSignKey *ecdsa.PrivateKey
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *ECWriter) WriteObject(ctx context.Context, obj *objectSDK.Object) error {
|
func (e *ECWriter) WriteObject(ctx context.Context, obj *objectSDK.Object) error {
|
||||||
relayed, err := e.relayIfNotContainerNode(ctx, obj)
|
relayed, isContainerNode, err := e.relayIfNotContainerNode(ctx, obj)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -60,23 +65,35 @@ func (e *ECWriter) WriteObject(ctx context.Context, obj *objectSDK.Object) error
|
||||||
e.ObjectMetaValid = true
|
e.ObjectMetaValid = true
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if isContainerNode {
|
||||||
|
restoreTokens := e.CommonPrm.ForgetTokens()
|
||||||
|
defer restoreTokens()
|
||||||
|
// As request executed on container node, so sign request with container key.
|
||||||
|
e.remoteRequestSignKey, err = e.Config.KeyStorage.GetKey(nil)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
e.remoteRequestSignKey = e.Key
|
||||||
|
}
|
||||||
|
|
||||||
if obj.ECHeader() != nil {
|
if obj.ECHeader() != nil {
|
||||||
return e.writeECPart(ctx, obj)
|
return e.writeECPart(ctx, obj)
|
||||||
}
|
}
|
||||||
return e.writeRawObject(ctx, obj)
|
return e.writeRawObject(ctx, obj)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *ECWriter) relayIfNotContainerNode(ctx context.Context, obj *objectSDK.Object) (bool, error) {
|
func (e *ECWriter) relayIfNotContainerNode(ctx context.Context, obj *objectSDK.Object) (bool, bool, error) {
|
||||||
if e.Relay == nil {
|
|
||||||
return false, nil
|
|
||||||
}
|
|
||||||
currentNodeIsContainerNode, err := e.currentNodeIsContainerNode()
|
currentNodeIsContainerNode, err := e.currentNodeIsContainerNode()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return false, err
|
return false, false, err
|
||||||
}
|
}
|
||||||
if currentNodeIsContainerNode {
|
if currentNodeIsContainerNode {
|
||||||
// object can be splitted or saved local
|
// object can be splitted or saved local
|
||||||
return false, nil
|
return false, true, nil
|
||||||
|
}
|
||||||
|
if e.Relay == nil {
|
||||||
|
return false, currentNodeIsContainerNode, nil
|
||||||
}
|
}
|
||||||
objID := object.AddressOf(obj).Object()
|
objID := object.AddressOf(obj).Object()
|
||||||
var index uint32
|
var index uint32
|
||||||
|
@ -85,9 +102,9 @@ func (e *ECWriter) relayIfNotContainerNode(ctx context.Context, obj *objectSDK.O
|
||||||
index = obj.ECHeader().Index()
|
index = obj.ECHeader().Index()
|
||||||
}
|
}
|
||||||
if err := e.relayToContainerNode(ctx, objID, index); err != nil {
|
if err := e.relayToContainerNode(ctx, objID, index); err != nil {
|
||||||
return false, err
|
return false, false, err
|
||||||
}
|
}
|
||||||
return true, nil
|
return true, currentNodeIsContainerNode, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *ECWriter) currentNodeIsContainerNode() (bool, error) {
|
func (e *ECWriter) currentNodeIsContainerNode() (bool, error) {
|
||||||
|
@ -235,6 +252,13 @@ func (e *ECWriter) writeRawObject(ctx context.Context, obj *objectSDK.Object) er
|
||||||
singleErr: err,
|
singleErr: err,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
for idx := range partsProcessed {
|
||||||
|
if !partsProcessed[idx].Load() {
|
||||||
|
return errIncompletePut{
|
||||||
|
singleErr: errFailedToSaveAllECParts,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -338,7 +362,7 @@ func (e *ECWriter) writePartRemote(ctx context.Context, obj *objectSDK.Object, n
|
||||||
client.NodeInfoFromNetmapElement(&clientNodeInfo, node)
|
client.NodeInfoFromNetmapElement(&clientNodeInfo, node)
|
||||||
|
|
||||||
remoteTaget := remoteWriter{
|
remoteTaget := remoteWriter{
|
||||||
privateKey: e.Key,
|
privateKey: e.remoteRequestSignKey,
|
||||||
clientConstructor: e.Config.ClientConstructor,
|
clientConstructor: e.Config.ClientConstructor,
|
||||||
commonPrm: e.CommonPrm,
|
commonPrm: e.CommonPrm,
|
||||||
nodeInfo: clientNodeInfo,
|
nodeInfo: clientNodeInfo,
|
||||||
|
|
|
@ -14,6 +14,7 @@ import (
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
|
||||||
netmapcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
|
netmapcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/checksum"
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/checksum"
|
||||||
|
@ -127,6 +128,8 @@ func TestECWriter(t *testing.T) {
|
||||||
|
|
||||||
ownerKey, err := keys.NewPrivateKey()
|
ownerKey, err := keys.NewPrivateKey()
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
nodeKey, err := keys.NewPrivateKey()
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
pool, err := ants.NewPool(4, ants.WithNonblocking(true))
|
pool, err := ants.NewPool(4, ants.WithNonblocking(true))
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
@ -141,6 +144,7 @@ func TestECWriter(t *testing.T) {
|
||||||
RemotePool: pool,
|
RemotePool: pool,
|
||||||
Logger: log,
|
Logger: log,
|
||||||
ClientConstructor: clientConstructor{vectors: ns},
|
ClientConstructor: clientConstructor{vectors: ns},
|
||||||
|
KeyStorage: util.NewKeyStorage(&nodeKey.PrivateKey, nil, nil),
|
||||||
},
|
},
|
||||||
PlacementOpts: append(
|
PlacementOpts: append(
|
||||||
[]placement.Option{placement.UseBuilder(builder), placement.ForContainer(cnr)},
|
[]placement.Option{placement.UseBuilder(builder), placement.ForContainer(cnr)},
|
||||||
|
|
|
@ -100,12 +100,19 @@ func (p *CommonPrm) SetNetmapLookupDepth(v uint64) {
|
||||||
|
|
||||||
// ForgetTokens forgets all the tokens read from the request's
|
// ForgetTokens forgets all the tokens read from the request's
|
||||||
// meta information before.
|
// meta information before.
|
||||||
func (p *CommonPrm) ForgetTokens() {
|
func (p *CommonPrm) ForgetTokens() func() {
|
||||||
if p != nil {
|
if p != nil {
|
||||||
|
tk := p.token
|
||||||
|
br := p.bearer
|
||||||
p.token = nil
|
p.token = nil
|
||||||
p.bearer = nil
|
p.bearer = nil
|
||||||
|
return func() {
|
||||||
|
p.token = tk
|
||||||
|
p.bearer = br
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
return func() {}
|
||||||
|
}
|
||||||
|
|
||||||
func CommonPrmFromV2(req interface {
|
func CommonPrmFromV2(req interface {
|
||||||
GetMetaHeader() *session.RequestMetaHeader
|
GetMetaHeader() *session.RequestMetaHeader
|
||||||
|
|
|
@ -3,6 +3,7 @@ package placement
|
||||||
import (
|
import (
|
||||||
"crypto/sha256"
|
"crypto/sha256"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"slices"
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||||
|
@ -44,7 +45,7 @@ func (c *ContainerNodesCache) ContainerNodes(nm *netmapSDK.NetMap, cnr cid.ID, p
|
||||||
raw, ok := c.containerCache.Get(cnr)
|
raw, ok := c.containerCache.Get(cnr)
|
||||||
c.mtx.Unlock()
|
c.mtx.Unlock()
|
||||||
if ok {
|
if ok {
|
||||||
return raw, nil
|
return c.cloneResult(raw), nil
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
c.lastEpoch = nm.Epoch()
|
c.lastEpoch = nm.Epoch()
|
||||||
|
@ -65,5 +66,13 @@ func (c *ContainerNodesCache) ContainerNodes(nm *netmapSDK.NetMap, cnr cid.ID, p
|
||||||
c.containerCache.Add(cnr, cn)
|
c.containerCache.Add(cnr, cn)
|
||||||
}
|
}
|
||||||
c.mtx.Unlock()
|
c.mtx.Unlock()
|
||||||
return cn, nil
|
return c.cloneResult(cn), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *ContainerNodesCache) cloneResult(nodes [][]netmapSDK.NodeInfo) [][]netmapSDK.NodeInfo {
|
||||||
|
result := make([][]netmapSDK.NodeInfo, len(nodes))
|
||||||
|
for repIdx := range nodes {
|
||||||
|
result[repIdx] = slices.Clone(nodes[repIdx])
|
||||||
|
}
|
||||||
|
return result
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue