Add EC replication #1129

Merged
dstepanov-yadro merged 7 commits from dstepanov-yadro/frostfs-node:feat/ec_restore into master 2024-09-04 19:51:08 +00:00
Showing only changes of commit 0e42126ddc - Show all commits

View file

@ -167,17 +167,29 @@ func (v *FormatValidator) validateSignatureKey(obj *objectSDK.Object) error {
token := obj.SessionToken() token := obj.SessionToken()
ownerID := obj.OwnerID() ownerID := obj.OwnerID()
if token == nil && obj.ECHeader() != nil {
role, err := v.isIROrContainerNode(obj, binKey)
if err != nil {
return err
}
if role == acl.RoleContainer {
// EC part could be restored or created by container node, so ownerID could not match object signature
return nil
}
return v.checkOwnerKey(ownerID, key)
}
if token == nil || !token.AssertAuthKey(&key) { if token == nil || !token.AssertAuthKey(&key) {
return v.checkOwnerKey(ownerID, key) return v.checkOwnerKey(ownerID, key)
} }
if v.verifyTokenIssuer { if v.verifyTokenIssuer {
signerIsIROrContainerNode, err := v.isIROrContainerNode(obj, binKey) role, err := v.isIROrContainerNode(obj, binKey)
if err != nil { if err != nil {
return err return err
} }
if signerIsIROrContainerNode { if role == acl.RoleContainer || role == acl.RoleInnerRing {
return nil return nil
} }
@ -190,10 +202,10 @@ func (v *FormatValidator) validateSignatureKey(obj *objectSDK.Object) error {
return nil return nil
} }
func (v *FormatValidator) isIROrContainerNode(obj *objectSDK.Object, signerKey []byte) (bool, error) { func (v *FormatValidator) isIROrContainerNode(obj *objectSDK.Object, signerKey []byte) (acl.Role, error) {
cnrID, containerIDSet := obj.ContainerID() cnrID, containerIDSet := obj.ContainerID()
if !containerIDSet { if !containerIDSet {
return false, errNilCID return acl.RoleOthers, errNilCID
} }
cnrIDBin := make([]byte, sha256.Size) cnrIDBin := make([]byte, sha256.Size)
@ -201,14 +213,14 @@ func (v *FormatValidator) isIROrContainerNode(obj *objectSDK.Object, signerKey [
cnr, err := v.containers.Get(cnrID) cnr, err := v.containers.Get(cnrID)
if err != nil { if err != nil {
return false, fmt.Errorf("failed to get container (id=%s): %w", cnrID.EncodeToString(), err) return acl.RoleOthers, fmt.Errorf("failed to get container (id=%s): %w", cnrID.EncodeToString(), err)
} }
res, err := v.senderClassifier.IsInnerRingOrContainerNode(signerKey, cnrID, cnr.Value) res, err := v.senderClassifier.IsInnerRingOrContainerNode(signerKey, cnrID, cnr.Value)
if err != nil { if err != nil {
return false, err return acl.RoleOthers, err
} }
return res.Role == acl.RoleContainer || res.Role == acl.RoleInnerRing, nil return res.Role, nil
} }
func (v *FormatValidator) checkOwnerKey(id user.ID, key frostfsecdsa.PublicKey) error { func (v *FormatValidator) checkOwnerKey(id user.ID, key frostfsecdsa.PublicKey) error {