[#1302] putSvc: Override SuccessAfter for non-regular objects in EC containers

Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
This commit is contained in:
Dmitrii Stepanov 2024-08-09 10:38:45 +03:00 committed by Evgenii Stratonikov
parent 078b5fdd6e
commit 8b4f3d82c9
2 changed files with 16 additions and 4 deletions

View file

@ -19,6 +19,7 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/policy"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/internal"
svcutil "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
@ -229,6 +230,9 @@ func (s *Service) getPutSinglePlacementOptions(obj *objectSDK.Object, copiesNumb
if len(copiesNumber) > 0 && !result.isEC {
result.placementOptions = append(result.placementOptions, placement.WithCopyNumbers(copiesNumber))
}
if container.IsECContainer(cnrInfo.Value) && !object.IsECSupported(obj) && !localOnly {
result.placementOptions = append(result.placementOptions, placement.SuccessAfter(uint32(policy.ECParityCount(cnrInfo.Value.PlacementPolicy())+1)))
}
result.placementOptions = append(result.placementOptions, placement.ForContainer(cnrInfo.Value))
objID, ok := obj.ID()

View file

@ -10,6 +10,7 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/policy"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement"
pkgutil "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util"
@ -212,10 +213,10 @@ func (p *Streamer) newObjectWriter(prm *PutInitPrm) transformer.ObjectWriter {
if container.IsECContainer(prm.cnr) && object.IsECSupported(prm.hdr) {
return p.newECWriter(prm)
}
return p.newDefaultObjectWriter(prm)
return p.newDefaultObjectWriter(prm, false)
}
func (p *Streamer) newDefaultObjectWriter(prm *PutInitPrm) transformer.ObjectWriter {
func (p *Streamer) newDefaultObjectWriter(prm *PutInitPrm, forECPlacement bool) transformer.ObjectWriter {
var relay func(context.Context, nodeDesc) error
if p.relay != nil {
relay = func(ctx context.Context, node nodeDesc) error {
@ -232,9 +233,16 @@ func (p *Streamer) newDefaultObjectWriter(prm *PutInitPrm) transformer.ObjectWri
}
}
traverseOpts := prm.traverseOpts
if forECPlacement && !prm.common.LocalOnly() {
// save non-regular and linking object to EC container.
// EC 2.1 -> REP 2, EC 2.2 -> REP 3 etc.
traverseOpts = append(traverseOpts, placement.SuccessAfter(uint32(policy.ECParityCount(prm.cnr.PlacementPolicy())+1)))
}
return &distributedTarget{
cfg: p.cfg,
placementOpts: prm.traverseOpts,
placementOpts: traverseOpts,
nodeTargetInitializer: func(node nodeDesc) preparedObjectTarget {
if node.local {
return localTarget{
@ -266,7 +274,7 @@ func (p *Streamer) newECWriter(prm *PutInitPrm) transformer.ObjectWriter {
commonPrm: prm.common,
relay: p.relay,
},
repWriter: p.newDefaultObjectWriter(prm),
repWriter: p.newDefaultObjectWriter(prm, true),
}
}