Compare commits

...

5 commits

Author SHA1 Message Date
b4adf43557
[#1451] placement: Return copy of slice from container nodes cache
All checks were successful
Tests and linters / Run gofumpt (pull_request) Successful in 1m53s
DCO action / DCO (pull_request) Successful in 2m8s
Pre-commit hooks / Pre-commit (pull_request) Successful in 3m12s
Vulncheck / Vulncheck (pull_request) Successful in 3m6s
Build / Build Components (pull_request) Successful in 3m23s
Tests and linters / Staticcheck (pull_request) Successful in 3m17s
Tests and linters / Tests (pull_request) Successful in 3m21s
Tests and linters / gopls check (pull_request) Successful in 3m35s
Tests and linters / Lint (pull_request) Successful in 4m15s
Tests and linters / Tests with -race (pull_request) Successful in 6m31s
Nodes from cache could be changed by traverser, if no objectID specified.
So it is required to return copy of cache's slice.

Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
2024-11-06 10:41:28 +03:00
bec16a7e7c
[#1451] ec: Check all parts are saved
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
2024-11-06 10:41:27 +03:00
26c728363c
[#1451] ape: Drop unused
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
2024-11-06 10:41:27 +03:00
de3a89a0ba
[#1451] ape: Perform strict APE checks for EC parts
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
2024-11-06 10:41:26 +03:00
0befda5f4f
[#1451] writer: Sign EC parts with node's private key
As EC put request may be processed only by container node, so sign requests
with current node private to not to perform APE checks.

Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
2024-11-06 10:41:26 +03:00
7 changed files with 89 additions and 33 deletions

View file

@ -67,9 +67,6 @@ type Prm struct {
// If SoftAPECheck is set to true, then NoRuleFound is interpreted as allow.
SoftAPECheck bool
// If true, object headers will not retrieved from storage engine.
WithoutHeaderRequest bool
// The request's bearer token. It is used in order to check APE overrides with the token.
BearerToken *bearer.Token

View file

@ -695,6 +695,7 @@ func TestPutECChunk(t *testing.T) {
nm := &netmapStub{
currentEpoch: 100,
netmaps: map[uint64]*netmapSDK.NetMap{
99: netmap,
100: netmap,
},
}

View file

@ -3,6 +3,7 @@ package ape
import (
"context"
"crypto/sha256"
"errors"
"fmt"
"net"
"strconv"
@ -11,6 +12,7 @@ import (
aperequest "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/ape/request"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/acl"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
@ -24,6 +26,8 @@ import (
var defaultRequest = aperequest.Request{}
var errECMissingParentObjectID = errors.New("missing EC parent object ID")
func nativeSchemaRole(role acl.Role) string {
switch role {
case acl.RoleOwner:
@ -116,13 +120,16 @@ func (c *checkerImpl) newAPERequest(ctx context.Context, prm Prm) (aperequest.Re
var header *objectV2.Header
if prm.Header != nil {
header = prm.Header
} else if prm.Object != nil && !prm.WithoutHeaderRequest {
} else if prm.Object != nil {
headerObjSDK, err := c.headerProvider.GetHeader(ctx, prm.Container, *prm.Object, true)
if err == nil {
header = headerObjSDK.ToV2().GetHeader()
}
}
header = c.fillHeaderWithECParent(ctx, prm, header)
header, err := c.fillHeaderWithECParent(ctx, prm, header)
if err != nil {
return defaultRequest, fmt.Errorf("get EC parent header: %w", err)
}
reqProps := map[string]string{
nativeschema.PropertyKeyActorPublicKey: prm.SenderKey,
nativeschema.PropertyKeyActorRole: prm.Role,
@ -133,7 +140,6 @@ func (c *checkerImpl) newAPERequest(ctx context.Context, prm Prm) (aperequest.Re
reqProps[xheadKey] = xhead.GetValue()
}
var err error
reqProps, err = c.fillWithUserClaimTags(reqProps, prm)
if err != nil {
return defaultRequest, err
@ -155,35 +161,43 @@ func (c *checkerImpl) newAPERequest(ctx context.Context, prm Prm) (aperequest.Re
), nil
}
func (c *checkerImpl) fillHeaderWithECParent(ctx context.Context, prm Prm, header *objectV2.Header) *objectV2.Header {
func (c *checkerImpl) fillHeaderWithECParent(ctx context.Context, prm Prm, header *objectV2.Header) (*objectV2.Header, error) {
if header == nil {
return header
return header, nil
}
if header.GetEC() == nil {
return header
}
if prm.Role == nativeschema.PropertyValueContainerRoleContainer ||
prm.Role == nativeschema.PropertyValueContainerRoleIR {
return header
return header, nil
}
parentObjRefID := header.GetEC().Parent
if parentObjRefID == nil {
return header
return nil, errECMissingParentObjectID
}
var parentObjID oid.ID
if err := parentObjID.ReadFromV2(*parentObjRefID); err != nil {
return header
return nil, fmt.Errorf("EC parent object ID format error: %w", err)
}
// only container node have access to collect parent object
contNode, err := c.currentNodeIsContainerNode(prm.Container)
if err != nil || !contNode {
return header
if err != nil {
return nil, fmt.Errorf("check container node status: %w", err)
}
if !contNode {
return header, nil
}
parentObj, err := c.headerProvider.GetHeader(ctx, prm.Container, parentObjID, false)
if err != nil {
return header
if isLogicalError(err) {
return header, nil
}
return nil, fmt.Errorf("EC parent header request: %w", err)
}
return parentObj.ToV2().GetHeader()
return parentObj.ToV2().GetHeader(), nil
}
func isLogicalError(err error) bool {
var errObjRemoved *apistatus.ObjectAlreadyRemoved
var errObjNotFound *apistatus.ObjectNotFound
return errors.As(err, &errObjRemoved) || errors.As(err, &errObjNotFound)
}
func (c *checkerImpl) currentNodeIsContainerNode(cnrID cid.ID) (bool, error) {

View file

@ -25,7 +25,10 @@ import (
var _ transformer.ObjectWriter = (*ECWriter)(nil)
var errUnsupportedECObject = errors.New("object is not supported for erasure coding")
var (
errUnsupportedECObject = errors.New("object is not supported for erasure coding")
errFailedToSaveAllECParts = errors.New("failed to save all EC parts")
)
type ECWriter struct {
Config *Config
@ -37,10 +40,12 @@ type ECWriter struct {
ObjectMeta object.ContentMeta
ObjectMetaValid bool
remoteRequestSignKey *ecdsa.PrivateKey
}
func (e *ECWriter) WriteObject(ctx context.Context, obj *objectSDK.Object) error {
relayed, err := e.relayIfNotContainerNode(ctx, obj)
relayed, isContainerNode, err := e.relayIfNotContainerNode(ctx, obj)
if err != nil {
return err
}
@ -60,23 +65,35 @@ func (e *ECWriter) WriteObject(ctx context.Context, obj *objectSDK.Object) error
e.ObjectMetaValid = true
}
if isContainerNode {
restoreTokens := e.CommonPrm.ForgetTokens()
defer restoreTokens()
// As request executed on container node, so sign request with container key.
e.remoteRequestSignKey, err = e.Config.KeyStorage.GetKey(nil)
if err != nil {
return err
}
} else {
e.remoteRequestSignKey = e.Key
}
if obj.ECHeader() != nil {
return e.writeECPart(ctx, obj)
}
return e.writeRawObject(ctx, obj)
}
func (e *ECWriter) relayIfNotContainerNode(ctx context.Context, obj *objectSDK.Object) (bool, error) {
if e.Relay == nil {
return false, nil
}
func (e *ECWriter) relayIfNotContainerNode(ctx context.Context, obj *objectSDK.Object) (bool, bool, error) {
currentNodeIsContainerNode, err := e.currentNodeIsContainerNode()
if err != nil {
return false, err
return false, false, err
}
if currentNodeIsContainerNode {
// object can be splitted or saved local
return false, nil
return false, true, nil
}
if e.Relay == nil {
return false, currentNodeIsContainerNode, nil
}
objID := object.AddressOf(obj).Object()
var index uint32
@ -85,9 +102,9 @@ func (e *ECWriter) relayIfNotContainerNode(ctx context.Context, obj *objectSDK.O
index = obj.ECHeader().Index()
}
if err := e.relayToContainerNode(ctx, objID, index); err != nil {
return false, err
return false, false, err
}
return true, nil
return true, currentNodeIsContainerNode, nil
}
func (e *ECWriter) currentNodeIsContainerNode() (bool, error) {
@ -235,6 +252,13 @@ func (e *ECWriter) writeRawObject(ctx context.Context, obj *objectSDK.Object) er
singleErr: err,
}
}
for idx := range partsProcessed {
if !partsProcessed[idx].Load() {
return errIncompletePut{
singleErr: errFailedToSaveAllECParts,
}
}
}
return nil
}
@ -338,7 +362,7 @@ func (e *ECWriter) writePartRemote(ctx context.Context, obj *objectSDK.Object, n
client.NodeInfoFromNetmapElement(&clientNodeInfo, node)
remoteTaget := remoteWriter{
privateKey: e.Key,
privateKey: e.remoteRequestSignKey,
clientConstructor: e.Config.ClientConstructor,
commonPrm: e.CommonPrm,
nodeInfo: clientNodeInfo,

View file

@ -14,6 +14,7 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
netmapcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/checksum"
@ -127,6 +128,8 @@ func TestECWriter(t *testing.T) {
ownerKey, err := keys.NewPrivateKey()
require.NoError(t, err)
nodeKey, err := keys.NewPrivateKey()
require.NoError(t, err)
pool, err := ants.NewPool(4, ants.WithNonblocking(true))
require.NoError(t, err)
@ -141,6 +144,7 @@ func TestECWriter(t *testing.T) {
RemotePool: pool,
Logger: log,
ClientConstructor: clientConstructor{vectors: ns},
KeyStorage: util.NewKeyStorage(&nodeKey.PrivateKey, nil, nil),
},
PlacementOpts: append(
[]placement.Option{placement.UseBuilder(builder), placement.ForContainer(cnr)},

View file

@ -100,11 +100,18 @@ func (p *CommonPrm) SetNetmapLookupDepth(v uint64) {
// ForgetTokens forgets all the tokens read from the request's
// meta information before.
func (p *CommonPrm) ForgetTokens() {
func (p *CommonPrm) ForgetTokens() func() {
if p != nil {
tk := p.token
br := p.bearer
p.token = nil
p.bearer = nil
return func() {
p.token = tk
p.bearer = br
}
}
return func() {}
}
func CommonPrmFromV2(req interface {

View file

@ -3,6 +3,7 @@ package placement
import (
"crypto/sha256"
"fmt"
"slices"
"sync"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
@ -44,7 +45,7 @@ func (c *ContainerNodesCache) ContainerNodes(nm *netmapSDK.NetMap, cnr cid.ID, p
raw, ok := c.containerCache.Get(cnr)
c.mtx.Unlock()
if ok {
return raw, nil
return c.cloneResult(raw), nil
}
} else {
c.lastEpoch = nm.Epoch()
@ -65,5 +66,13 @@ func (c *ContainerNodesCache) ContainerNodes(nm *netmapSDK.NetMap, cnr cid.ID, p
c.containerCache.Add(cnr, cn)
}
c.mtx.Unlock()
return cn, nil
return c.cloneResult(cn), nil
}
func (c *ContainerNodesCache) cloneResult(nodes [][]netmapSDK.NodeInfo) [][]netmapSDK.NodeInfo {
result := make([][]netmapSDK.NodeInfo, len(nodes))
for repIdx := range nodes {
result[repIdx] = slices.Clone(nodes[repIdx])
}
return result
}