Override SuccessAfter for non-regular objects in EC containers support/v0.42 #1304
2 changed files with 16 additions and 4 deletions
|
@ -19,6 +19,7 @@ import (
|
|||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/policy"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/internal"
|
||||
svcutil "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
|
||||
|
@ -229,6 +230,9 @@ func (s *Service) getPutSinglePlacementOptions(obj *objectSDK.Object, copiesNumb
|
|||
if len(copiesNumber) > 0 && !result.isEC {
|
||||
result.placementOptions = append(result.placementOptions, placement.WithCopyNumbers(copiesNumber))
|
||||
}
|
||||
if container.IsECContainer(cnrInfo.Value) && !object.IsECSupported(obj) && !localOnly {
|
||||
result.placementOptions = append(result.placementOptions, placement.SuccessAfter(uint32(policy.ECParityCount(cnrInfo.Value.PlacementPolicy())+1)))
|
||||
}
|
||||
result.placementOptions = append(result.placementOptions, placement.ForContainer(cnrInfo.Value))
|
||||
|
||||
objID, ok := obj.ID()
|
||||
|
|
|
@ -10,6 +10,7 @@ import (
|
|||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/policy"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement"
|
||||
pkgutil "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util"
|
||||
|
@ -212,10 +213,10 @@ func (p *Streamer) newObjectWriter(prm *PutInitPrm) transformer.ObjectWriter {
|
|||
if container.IsECContainer(prm.cnr) && object.IsECSupported(prm.hdr) {
|
||||
return p.newECWriter(prm)
|
||||
}
|
||||
return p.newDefaultObjectWriter(prm)
|
||||
return p.newDefaultObjectWriter(prm, false)
|
||||
}
|
||||
|
||||
func (p *Streamer) newDefaultObjectWriter(prm *PutInitPrm) transformer.ObjectWriter {
|
||||
func (p *Streamer) newDefaultObjectWriter(prm *PutInitPrm, forECPlacement bool) transformer.ObjectWriter {
|
||||
var relay func(context.Context, nodeDesc) error
|
||||
if p.relay != nil {
|
||||
relay = func(ctx context.Context, node nodeDesc) error {
|
||||
|
@ -232,9 +233,16 @@ func (p *Streamer) newDefaultObjectWriter(prm *PutInitPrm) transformer.ObjectWri
|
|||
}
|
||||
}
|
||||
|
||||
traverseOpts := prm.traverseOpts
|
||||
if forECPlacement && !prm.common.LocalOnly() {
|
||||
// save non-regular and linking object to EC container.
|
||||
// EC 2.1 -> REP 2, EC 2.2 -> REP 3 etc.
|
||||
traverseOpts = append(traverseOpts, placement.SuccessAfter(uint32(policy.ECParityCount(prm.cnr.PlacementPolicy())+1)))
|
||||
}
|
||||
|
||||
return &distributedTarget{
|
||||
cfg: p.cfg,
|
||||
placementOpts: prm.traverseOpts,
|
||||
placementOpts: traverseOpts,
|
||||
nodeTargetInitializer: func(node nodeDesc) preparedObjectTarget {
|
||||
if node.local {
|
||||
return localTarget{
|
||||
|
@ -266,7 +274,7 @@ func (p *Streamer) newECWriter(prm *PutInitPrm) transformer.ObjectWriter {
|
|||
commonPrm: prm.common,
|
||||
relay: p.relay,
|
||||
},
|
||||
repWriter: p.newDefaultObjectWriter(prm),
|
||||
repWriter: p.newDefaultObjectWriter(prm, true),
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in a new issue