[#1064] putsvc: Add EC put

Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
This commit is contained in:
Dmitrii Stepanov 2024-03-28 13:46:19 +03:00 committed by Evgenii Stratonikov
parent 39da643354
commit 1c5e0f90aa
11 changed files with 452 additions and 23 deletions

1
go.mod
View file

@ -85,6 +85,7 @@ require (
github.com/ipfs/go-cid v0.4.1 // indirect github.com/ipfs/go-cid v0.4.1 // indirect
github.com/josharian/intern v1.0.0 // indirect github.com/josharian/intern v1.0.0 // indirect
github.com/klauspost/cpuid/v2 v2.2.6 // indirect github.com/klauspost/cpuid/v2 v2.2.6 // indirect
github.com/klauspost/reedsolomon v1.12.1 // indirect
github.com/magiconair/properties v1.8.7 // indirect github.com/magiconair/properties v1.8.7 // indirect
github.com/mailru/easyjson v0.7.7 // indirect github.com/mailru/easyjson v0.7.7 // indirect
github.com/mattn/go-runewidth v0.0.15 // indirect github.com/mattn/go-runewidth v0.0.15 // indirect

2
go.sum
View file

@ -133,6 +133,8 @@ github.com/klauspost/compress v1.17.4 h1:Ej5ixsIri7BrIjBkRZLTo6ghwrEtHFk7ijlczPW
github.com/klauspost/compress v1.17.4/go.mod h1:/dCuZOvVtNoHsyb+cuJD3itjs3NbnF6KH9zAO4BDxPM= github.com/klauspost/compress v1.17.4/go.mod h1:/dCuZOvVtNoHsyb+cuJD3itjs3NbnF6KH9zAO4BDxPM=
github.com/klauspost/cpuid/v2 v2.2.6 h1:ndNyv040zDGIDh8thGkXYjnFtiN02M1PVVF+JE/48xc= github.com/klauspost/cpuid/v2 v2.2.6 h1:ndNyv040zDGIDh8thGkXYjnFtiN02M1PVVF+JE/48xc=
github.com/klauspost/cpuid/v2 v2.2.6/go.mod h1:Lcz8mBdAVJIBVzewtcLocK12l3Y+JytZYpaMropDUws= github.com/klauspost/cpuid/v2 v2.2.6/go.mod h1:Lcz8mBdAVJIBVzewtcLocK12l3Y+JytZYpaMropDUws=
github.com/klauspost/reedsolomon v1.12.1 h1:NhWgum1efX1x58daOBGCFWcxtEhOhXKKl1HAPQUp03Q=
github.com/klauspost/reedsolomon v1.12.1/go.mod h1:nEi5Kjb6QqtbofI6s+cbG/j1da11c96IBYBSnVGtuBs=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=

View file

@ -579,4 +579,6 @@ const (
EngineShardsEvacuationFailedToMoveTree = "failed to evacuate tree to other node" EngineShardsEvacuationFailedToMoveTree = "failed to evacuate tree to other node"
EngineShardsEvacuationTreeEvacuatedLocal = "tree evacuated to local node" EngineShardsEvacuationTreeEvacuatedLocal = "tree evacuated to local node"
EngineShardsEvacuationTreeEvacuatedRemote = "tree evacuated to other node" EngineShardsEvacuationTreeEvacuatedRemote = "tree evacuated to other node"
ECFailedToSendToContainerNode = "failed to send EC object to container node"
ECFailedToSaveECPart = "failed to save EC part"
) )

11
pkg/core/container/ec.go Normal file
View file

@ -0,0 +1,11 @@
package container
import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/policy"
containerSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
)
// IsECContainer returns True if container has erasure coding policy.
func IsECContainer(cnr containerSDK.Container) bool {
return policy.IsECPlacement(cnr.PlacementPolicy())
}

13
pkg/core/object/ec.go Normal file
View file

@ -0,0 +1,13 @@
package object
import (
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
)
// IsECSupported returns True if EC supported for object.
//
// EC supported only for regular, not linking objects.
func IsECSupported(obj *objectSDK.Object) bool {
return obj.Type() == objectSDK.TypeRegular &&
len(obj.Children()) == 0
}

20
pkg/core/policy/ec.go Normal file
View file

@ -0,0 +1,20 @@
package policy
import (
netmapSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
)
// IsECPlacement returns True if policy is erasure coding policy.
func IsECPlacement(policy netmapSDK.PlacementPolicy) bool {
return policy.NumberOfReplicas() == 1 && policy.ReplicaDescriptor(0).GetECDataCount() > 0
}
// ECDataCount returns EC data count for EC placement policy.
func ECDataCount(policy netmapSDK.PlacementPolicy) int {
return int(policy.ReplicaDescriptor(0).GetECDataCount())
}
// ECParityCount returns EC parity count for EC placement policy.
func ECParityCount(policy netmapSDK.PlacementPolicy) int {
return int(policy.ReplicaDescriptor(0).GetECParityCount())
}

View file

@ -0,0 +1,265 @@
package putsvc
import (
"context"
"crypto/ecdsa"
"errors"
"fmt"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/policy"
svcutil "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement"
containerSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/erasurecode"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/transformer"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
)
var _ transformer.ObjectWriter = (*ecWriter)(nil)
var errUnsupportedECObject = errors.New("object is not supported for erasure coding")
type ecWriter struct {
cfg *cfg
placementOpts []placement.Option
container containerSDK.Container
key *ecdsa.PrivateKey
commonPrm *svcutil.CommonPrm
relay func(context.Context, client.NodeInfo, client.MultiAddressClient) error
objMeta object.ContentMeta
objMetaValid bool
}
func (e *ecWriter) WriteObject(ctx context.Context, obj *objectSDK.Object) error {
relayed, err := e.relayIfNotContainerNode(ctx)
if err != nil {
return err
}
if relayed {
return nil
}
if !object.IsECSupported(obj) {
// must be resolved by caller
return errUnsupportedECObject
}
if !e.objMetaValid {
if e.objMeta, err = e.cfg.fmtValidator.ValidateContent(obj); err != nil {
return fmt.Errorf("(%T) could not validate payload content: %w", e, err)
}
e.objMetaValid = true
}
if obj.ECHeader() != nil {
return e.writeECPart(ctx, obj)
}
return e.writeRawObject(ctx, obj)
}
func (e *ecWriter) relayIfNotContainerNode(ctx context.Context) (bool, error) {
if e.relay == nil {
return false, nil
}
currentNodeIsContainerNode, err := e.currentNodeIsContainerNode()
if err != nil {
return false, err
}
if currentNodeIsContainerNode {
// object can be splitted or saved local
return false, nil
}
if err := e.relayToContainerNode(ctx); err != nil {
return false, err
}
return true, nil
}
func (e *ecWriter) currentNodeIsContainerNode() (bool, error) {
t, err := placement.NewTraverser(e.placementOpts...)
if err != nil {
return false, err
}
for {
nodes := t.Next()
if len(nodes) == 0 {
break
}
for _, node := range nodes {
if e.cfg.netmapKeys.IsLocalKey(node.PublicKey()) {
return true, nil
}
}
}
return false, nil
}
func (e *ecWriter) relayToContainerNode(ctx context.Context) error {
t, err := placement.NewTraverser(e.placementOpts...)
if err != nil {
return err
}
var lastErr error
for {
nodes := t.Next()
if len(nodes) == 0 {
break
}
for _, node := range nodes {
var info client.NodeInfo
client.NodeInfoFromNetmapElement(&info, node)
c, err := e.cfg.clientConstructor.Get(info)
if err != nil {
return fmt.Errorf("could not create SDK client %s: %w", info.AddressGroup(), err)
}
completed := make(chan interface{})
if poolErr := e.cfg.remotePool.Submit(func() {
defer close(completed)
err = e.relay(ctx, info, c)
}); poolErr != nil {
close(completed)
svcutil.LogWorkerPoolError(e.cfg.log, "PUT", poolErr)
return poolErr
}
<-completed
if err == nil {
return nil
}
e.cfg.log.Logger.Warn(logs.ECFailedToSendToContainerNode, zap.Stringers("address_group", info.AddressGroup()))
lastErr = err
}
}
if lastErr == nil {
return nil
}
return errIncompletePut{
singleErr: lastErr,
}
}
func (e *ecWriter) writeECPart(ctx context.Context, obj *objectSDK.Object) error {
t, err := placement.NewTraverser(e.placementOpts...)
if err != nil {
return err
}
eg, egCtx := errgroup.WithContext(ctx)
for {
nodes := t.Next()
if len(nodes) == 0 {
break
}
eg.Go(func() error {
return e.writePart(egCtx, obj, int(obj.ECHeader().Index()), nodes)
})
}
if err := eg.Wait(); err != nil {
return errIncompletePut{
singleErr: err,
}
}
return nil
}
func (e *ecWriter) writeRawObject(ctx context.Context, obj *objectSDK.Object) error {
// now only single EC policy is supported
c, err := erasurecode.NewConstructor(policy.ECDataCount(e.container.PlacementPolicy()), policy.ECParityCount(e.container.PlacementPolicy()))
if err != nil {
return err
}
parts, err := c.Split(obj, e.key)
if err != nil {
return err
}
t, err := placement.NewTraverser(e.placementOpts...)
if err != nil {
return err
}
eg, egCtx := errgroup.WithContext(ctx)
for {
nodes := t.Next()
if len(nodes) == 0 {
break
}
for idx := range parts {
idx := idx
eg.Go(func() error {
return e.writePart(egCtx, parts[idx], idx, nodes)
})
}
}
if err := eg.Wait(); err != nil {
return errIncompletePut{
singleErr: err,
}
}
return nil
}
func (e *ecWriter) writePart(ctx context.Context, obj *objectSDK.Object, partIdx int, nodes []placement.Node) error {
var err error
node := nodes[partIdx%len(nodes)]
if e.cfg.netmapKeys.IsLocalKey(node.PublicKey()) {
err = e.writePartLocal(ctx, obj)
} else {
err = e.writePartRemote(ctx, obj, node)
}
if err == nil {
return nil
}
e.cfg.log.Warn(logs.ECFailedToSaveECPart, zap.Stringer("parent_object", object.AddressOf(obj)), zap.Error(err))
return err
}
func (e *ecWriter) writePartLocal(ctx context.Context, obj *objectSDK.Object) error {
var err error
localTarget := localTarget{
storage: e.cfg.localStore,
}
completed := make(chan interface{})
if poolErr := e.cfg.localPool.Submit(func() {
defer close(completed)
err = localTarget.WriteObject(ctx, obj, e.objMeta)
}); poolErr != nil {
close(completed)
return poolErr
}
<-completed
return err
}
func (e *ecWriter) writePartRemote(ctx context.Context, obj *objectSDK.Object, node placement.Node) error {
var clientNodeInfo client.NodeInfo
client.NodeInfoFromNetmapElement(&clientNodeInfo, node)
remoteTaget := remoteTarget{
privateKey: e.key,
clientConstructor: e.cfg.clientConstructor,
commonPrm: e.commonPrm,
nodeInfo: clientNodeInfo,
}
var err error
completed := make(chan interface{})
if poolErr := e.cfg.remotePool.Submit(func() {
defer close(completed)
err = remoteTaget.WriteObject(ctx, obj, e.objMeta)
}); poolErr != nil {
close(completed)
return poolErr
}
<-completed
return err
}

View file

@ -16,6 +16,7 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/signature" "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/signature"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network"
@ -25,6 +26,7 @@ import (
tracingPkg "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/tracing" tracingPkg "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/tracing"
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/checksum" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/checksum"
containerSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
"git.frostfs.info/TrueCloudLab/tzhash/tz" "git.frostfs.info/TrueCloudLab/tzhash/tz"
"go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/attribute"
@ -32,7 +34,10 @@ import (
"go.uber.org/zap" "go.uber.org/zap"
) )
var errInvalidPayloadChecksum = errors.New("incorrect payload checksum") var (
errInvalidPayloadChecksum = errors.New("incorrect payload checksum")
errInvalidECObject = errors.New("object must be splitted to EC parts")
)
type putSingleRequestSigner struct { type putSingleRequestSigner struct {
req *objectAPI.PutSingleRequest req *objectAPI.PutSingleRequest
@ -148,12 +153,20 @@ func (s *Service) validatePutSingleObject(ctx context.Context, obj *objectSDK.Ob
func (s *Service) saveToNodes(ctx context.Context, obj *objectSDK.Object, req *objectAPI.PutSingleRequest, meta object.ContentMeta) error { func (s *Service) saveToNodes(ctx context.Context, obj *objectSDK.Object, req *objectAPI.PutSingleRequest, meta object.ContentMeta) error {
localOnly := req.GetMetaHeader().GetTTL() <= 1 localOnly := req.GetMetaHeader().GetTTL() <= 1
placementOptions, err := s.getPutSinglePlacementOptions(obj, req.GetBody().GetCopiesNumber(), localOnly) placement, err := s.getPutSinglePlacementOptions(obj, req.GetBody().GetCopiesNumber(), localOnly)
if err != nil { if err != nil {
return err return err
} }
iter := s.cfg.newNodeIterator(placementOptions) if placement.isEC {
return s.saveToECReplicas(ctx, placement, obj, req, meta)
}
return s.saveToREPReplicas(ctx, placement, obj, localOnly, req, meta)
}
func (s *Service) saveToREPReplicas(ctx context.Context, placement putSinglePlacement, obj *objectSDK.Object, localOnly bool, req *objectAPI.PutSingleRequest, meta object.ContentMeta) error {
iter := s.cfg.newNodeIterator(placement.placementOptions)
iter.extraBroadcastEnabled = needAdditionalBroadcast(obj, localOnly) iter.extraBroadcastEnabled = needAdditionalBroadcast(obj, localOnly)
signer := &putSingleRequestSigner{ signer := &putSingleRequestSigner{
@ -167,38 +180,83 @@ func (s *Service) saveToNodes(ctx context.Context, obj *objectSDK.Object, req *o
}) })
} }
func (s *Service) getPutSinglePlacementOptions(obj *objectSDK.Object, copiesNumber []uint32, localOnly bool) ([]placement.Option, error) { func (s *Service) saveToECReplicas(ctx context.Context, placement putSinglePlacement, obj *objectSDK.Object, req *objectAPI.PutSingleRequest, meta object.ContentMeta) error {
var result []placement.Option if obj.Type() == objectSDK.TypeRegular && obj.ECHeader() == nil {
if len(copiesNumber) > 0 { return errInvalidECObject
result = append(result, placement.WithCopyNumbers(copiesNumber))
} }
commonPrm, err := svcutil.CommonPrmFromV2(req)
if err != nil {
return err
}
key, err := s.cfg.keyStorage.GetKey(nil)
if err != nil {
return err
}
signer := &putSingleRequestSigner{
req: req,
keyStorage: s.keyStorage,
signer: &sync.Once{},
}
w := ecWriter{
cfg: s.cfg,
placementOpts: placement.placementOptions,
objMeta: meta,
objMetaValid: true,
commonPrm: commonPrm,
container: placement.container,
key: key,
relay: func(ctx context.Context, ni client.NodeInfo, mac client.MultiAddressClient) error {
return s.redirectPutSingleRequest(ctx, signer, obj, ni, mac)
},
}
return w.WriteObject(ctx, obj)
}
type putSinglePlacement struct {
placementOptions []placement.Option
isEC bool
container containerSDK.Container
}
func (s *Service) getPutSinglePlacementOptions(obj *objectSDK.Object, copiesNumber []uint32, localOnly bool) (putSinglePlacement, error) {
var result putSinglePlacement
cnrID, ok := obj.ContainerID() cnrID, ok := obj.ContainerID()
if !ok { if !ok {
return nil, errors.New("missing container ID") return result, errors.New("missing container ID")
} }
cnrInfo, err := s.cnrSrc.Get(cnrID) cnrInfo, err := s.cnrSrc.Get(cnrID)
if err != nil { if err != nil {
return nil, fmt.Errorf("could not get container by ID: %w", err) return result, fmt.Errorf("could not get container by ID: %w", err)
} }
result = append(result, placement.ForContainer(cnrInfo.Value)) result.container = cnrInfo.Value
result.isEC = container.IsECContainer(cnrInfo.Value) && object.IsECSupported(obj)
if len(copiesNumber) > 0 && !result.isEC {
result.placementOptions = append(result.placementOptions, placement.WithCopyNumbers(copiesNumber))
}
result.placementOptions = append(result.placementOptions, placement.ForContainer(cnrInfo.Value))
objID, ok := obj.ID() objID, ok := obj.ID()
if !ok { if !ok {
return nil, errors.New("missing object ID") return result, errors.New("missing object ID")
} }
result = append(result, placement.ForObject(objID)) if obj.ECHeader() != nil {
objID = obj.ECHeader().Parent()
}
result.placementOptions = append(result.placementOptions, placement.ForObject(objID))
latestNetmap, err := netmap.GetLatestNetworkMap(s.netMapSrc) latestNetmap, err := netmap.GetLatestNetworkMap(s.netMapSrc)
if err != nil { if err != nil {
return nil, fmt.Errorf("could not get latest network map: %w", err) return result, fmt.Errorf("could not get latest network map: %w", err)
} }
builder := placement.NewNetworkMapBuilder(latestNetmap) builder := placement.NewNetworkMapBuilder(latestNetmap)
if localOnly { if localOnly {
result = append(result, placement.SuccessAfter(1)) result.placementOptions = append(result.placementOptions, placement.SuccessAfter(1))
builder = svcutil.NewLocalPlacement(builder, s.netmapKeys) builder = svcutil.NewLocalPlacement(builder, s.netmapKeys)
} }
result = append(result, placement.UseBuilder(builder)) result.placementOptions = append(result.placementOptions, placement.UseBuilder(builder))
return result, nil return result, nil
} }

View file

@ -7,7 +7,9 @@ import (
"fmt" "fmt"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement"
pkgutil "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util" pkgutil "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util"
@ -19,7 +21,7 @@ import (
type Streamer struct { type Streamer struct {
*cfg *cfg
sessionKey *ecdsa.PrivateKey privateKey *ecdsa.PrivateKey
target transformer.ChunkedObjectWriter target transformer.ChunkedObjectWriter
@ -76,6 +78,12 @@ func (p *Streamer) initTarget(prm *PutInitPrm) error {
func (p *Streamer) initUntrustedTarget(prm *PutInitPrm) error { func (p *Streamer) initUntrustedTarget(prm *PutInitPrm) error {
p.relay = prm.relay p.relay = prm.relay
nodeKey, err := p.cfg.keyStorage.GetKey(nil)
if err != nil {
return err
}
p.privateKey = nodeKey
// prepare untrusted-Put object target // prepare untrusted-Put object target
p.target = &validatingPreparedTarget{ p.target = &validatingPreparedTarget{
nextTarget: newInMemoryObjectBuilder(p.newObjectWriter(prm)), nextTarget: newInMemoryObjectBuilder(p.newObjectWriter(prm)),
@ -102,7 +110,7 @@ func (p *Streamer) initTrustedTarget(prm *PutInitPrm) error {
} }
} }
sessionKey, err := p.keyStorage.GetKey(sessionInfo) key, err := p.keyStorage.GetKey(sessionInfo)
if err != nil { if err != nil {
return fmt.Errorf("(%T) could not receive session key: %w", p, err) return fmt.Errorf("(%T) could not receive session key: %w", p, err)
} }
@ -116,7 +124,7 @@ func (p *Streamer) initTrustedTarget(prm *PutInitPrm) error {
if sToken == nil { if sToken == nil {
var ownerSession user.ID var ownerSession user.ID
user.IDFromKey(&ownerSession, sessionKey.PublicKey) user.IDFromKey(&ownerSession, key.PublicKey)
if !ownerObj.Equals(ownerSession) { if !ownerObj.Equals(ownerSession) {
return fmt.Errorf("(%T) session token is missing but object owner id is different from the default key", p) return fmt.Errorf("(%T) session token is missing but object owner id is different from the default key", p)
@ -127,11 +135,11 @@ func (p *Streamer) initTrustedTarget(prm *PutInitPrm) error {
} }
} }
p.sessionKey = sessionKey p.privateKey = key
p.target = &validatingTarget{ p.target = &validatingTarget{
fmt: p.fmtValidator, fmt: p.fmtValidator,
nextTarget: transformer.NewPayloadSizeLimiter(transformer.Params{ nextTarget: transformer.NewPayloadSizeLimiter(transformer.Params{
Key: sessionKey, Key: key,
NextTargetInit: func() transformer.ObjectWriter { return p.newObjectWriter(prm) }, NextTargetInit: func() transformer.ObjectWriter { return p.newObjectWriter(prm) },
NetworkState: p.networkState, NetworkState: p.networkState,
MaxSize: p.maxPayloadSz, MaxSize: p.maxPayloadSz,
@ -171,7 +179,12 @@ func (p *Streamer) preparePrm(prm *PutInitPrm) error {
placement.ForContainer(prm.cnr), placement.ForContainer(prm.cnr),
) )
if id, ok := prm.hdr.ID(); ok { if ech := prm.hdr.ECHeader(); ech != nil {
prm.traverseOpts = append(prm.traverseOpts,
// set identifier of the processing object
placement.ForObject(ech.Parent()),
)
} else if id, ok := prm.hdr.ID(); ok {
prm.traverseOpts = append(prm.traverseOpts, prm.traverseOpts = append(prm.traverseOpts,
// set identifier of the processing object // set identifier of the processing object
placement.ForObject(id), placement.ForObject(id),
@ -196,6 +209,13 @@ func (p *Streamer) preparePrm(prm *PutInitPrm) error {
} }
func (p *Streamer) newObjectWriter(prm *PutInitPrm) transformer.ObjectWriter { func (p *Streamer) newObjectWriter(prm *PutInitPrm) transformer.ObjectWriter {
if container.IsECContainer(prm.cnr) && object.IsECSupported(prm.hdr) {
return p.newECWriter(prm)
}
return p.newDefaultObjectWriter(prm)
}
func (p *Streamer) newDefaultObjectWriter(prm *PutInitPrm) transformer.ObjectWriter {
var relay func(context.Context, nodeDesc) error var relay func(context.Context, nodeDesc) error
if p.relay != nil { if p.relay != nil {
relay = func(ctx context.Context, node nodeDesc) error { relay = func(ctx context.Context, node nodeDesc) error {
@ -223,7 +243,7 @@ func (p *Streamer) newObjectWriter(prm *PutInitPrm) transformer.ObjectWriter {
} }
rt := &remoteTarget{ rt := &remoteTarget{
privateKey: p.sessionKey, privateKey: p.privateKey,
commonPrm: prm.common, commonPrm: prm.common,
clientConstructor: p.clientConstructor, clientConstructor: p.clientConstructor,
} }
@ -236,6 +256,20 @@ func (p *Streamer) newObjectWriter(prm *PutInitPrm) transformer.ObjectWriter {
} }
} }
func (p *Streamer) newECWriter(prm *PutInitPrm) transformer.ObjectWriter {
return &objectWriterDispatcher{
ecWriter: &ecWriter{
cfg: p.cfg,
placementOpts: append(prm.traverseOpts, placement.WithCopyNumbers(nil)), // copies number ignored for EC
container: prm.cnr,
key: p.privateKey,
commonPrm: prm.common,
relay: p.relay,
},
repWriter: p.newDefaultObjectWriter(prm),
}
}
func (p *Streamer) SendChunk(ctx context.Context, prm *PutChunkPrm) error { func (p *Streamer) SendChunk(ctx context.Context, prm *PutChunkPrm) error {
if p.target == nil { if p.target == nil {
return errNotInit return errNotInit

View file

@ -0,0 +1,23 @@
package putsvc
import (
"context"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/transformer"
)
var _ transformer.ObjectWriter = (*objectWriterDispatcher)(nil)
type objectWriterDispatcher struct {
ecWriter transformer.ObjectWriter
repWriter transformer.ObjectWriter
}
func (m *objectWriterDispatcher) WriteObject(ctx context.Context, obj *objectSDK.Object) error {
if object.IsECSupported(obj) {
return m.ecWriter.WriteObject(ctx, obj)
}
return m.repWriter.WriteObject(ctx, obj)
}

View file

@ -137,7 +137,7 @@ func defaultCopiesVector(policy netmap.PlacementPolicy) []int {
copyVector := make([]int, 0, replNum) copyVector := make([]int, 0, replNum)
for i := 0; i < replNum; i++ { for i := 0; i < replNum; i++ {
copyVector = append(copyVector, int(policy.ReplicaDescriptor(i).NumberOfObjects())) copyVector = append(copyVector, int(policy.ReplicaDescriptor(i).NumberOfObjects()+policy.ReplicaDescriptor(i).GetECDataCount()+policy.ReplicaDescriptor(i).GetECParityCount()))
} }
return copyVector return copyVector