diff --git a/go.mod b/go.mod index 9a53bb8fa..e2735b223 100644 --- a/go.mod +++ b/go.mod @@ -6,7 +6,7 @@ require ( git.frostfs.info/TrueCloudLab/frostfs-api-go/v2 v2.15.1-0.20230704092742-285516a94ebe git.frostfs.info/TrueCloudLab/frostfs-contract v0.0.0-20230627134746-36f3d39c406a git.frostfs.info/TrueCloudLab/frostfs-observability v0.0.0-20230531082742-c97d21411eb6 - git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20230706140617-98cab7ed6166 + git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20230711142135-998fe1a7ab31 git.frostfs.info/TrueCloudLab/hrw v1.2.1 git.frostfs.info/TrueCloudLab/tzhash v1.8.0 github.com/cheggaaa/pb v1.0.29 diff --git a/go.sum b/go.sum index 31a4cb9e1..ddbc77633 100644 --- a/go.sum +++ b/go.sum @@ -44,8 +44,8 @@ git.frostfs.info/TrueCloudLab/frostfs-crypto v0.6.0 h1:FxqFDhQYYgpe41qsIHVOcdzSV git.frostfs.info/TrueCloudLab/frostfs-crypto v0.6.0/go.mod h1:RUIKZATQLJ+TaYQa60X2fTDwfuhMfm8Ar60bQ5fr+vU= git.frostfs.info/TrueCloudLab/frostfs-observability v0.0.0-20230531082742-c97d21411eb6 h1:aGQ6QaAnTerQ5Dq5b2/f9DUQtSqPkZZ/bkMx/HKuLCo= git.frostfs.info/TrueCloudLab/frostfs-observability v0.0.0-20230531082742-c97d21411eb6/go.mod h1:W8Nn08/l6aQ7UlIbpF7FsQou7TVpcRD1ZT1KG4TrFhE= -git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20230706140617-98cab7ed6166 h1:HSwD/CDbrUp45gQmfn9KYag8zN0GD+HA0l2+U+c3Ayo= -git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20230706140617-98cab7ed6166/go.mod h1:r5Fir/4jCVXzdfOyCUbikSDB99nVqnHNq7mzVcidnlA= +git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20230711142135-998fe1a7ab31 h1:PSHVtyD3vw2NTbdWBUz0Wql1WH42nSCP+4j6PmIgZ8Y= +git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20230711142135-998fe1a7ab31/go.mod h1:MlLlhZb4qbLhUScDngfujUZKhs0/2YKW9D8LTY3BApY= git.frostfs.info/TrueCloudLab/hrw v1.2.1 h1:ccBRK21rFvY5R1WotI6LNoPlizk7qSvdfD8lNIRudVc= git.frostfs.info/TrueCloudLab/hrw v1.2.1/go.mod h1:C1Ygde2n843yTZEQ0FP69jYiuaYV0kriLvP4zm8JuvM= git.frostfs.info/TrueCloudLab/rfc6979 v0.4.0 h1:M2KR3iBj7WpY3hP10IevfIB9MURr4O9mwVfJ+SjT3HA= diff --git a/pkg/services/object/put/distributed.go b/pkg/services/object/put/distributed.go index c8294c0b8..408ee099c 100644 --- a/pkg/services/object/put/distributed.go +++ b/pkg/services/object/put/distributed.go @@ -135,18 +135,7 @@ func (t *distributedTarget) Close(ctx context.Context) (*transformer.AccessIdent t.obj.SetPayload(t.payload.Data) - var err error - - if t.objMeta, err = t.fmt.ValidateContent(t.obj); err != nil { - return nil, fmt.Errorf("(%T) could not validate payload content: %w", t, err) - } - - if len(t.obj.Children()) > 0 { - // enabling extra broadcast for linking objects - t.traversal.extraBroadcastEnabled = true - } - - if err := t.iteratePlacement(ctx); err != nil { + if err := t.WriteObject(ctx, t.obj); err != nil { return nil, err } @@ -156,6 +145,24 @@ func (t *distributedTarget) Close(ctx context.Context) (*transformer.AccessIdent }, nil } +// WriteObject implements the transformer.ObjectWriter interface. +func (t *distributedTarget) WriteObject(ctx context.Context, obj *objectSDK.Object) error { + t.obj = obj + + var err error + + if t.objMeta, err = t.fmt.ValidateContent(t.obj); err != nil { + return fmt.Errorf("(%T) could not validate payload content: %w", t, err) + } + + if len(t.obj.Children()) > 0 { + // enabling extra broadcast for linking objects + t.traversal.extraBroadcastEnabled = true + } + + return t.iteratePlacement(ctx) +} + func (t *distributedTarget) sendObject(ctx context.Context, node nodeDesc) error { if !node.local && t.relay != nil { return t.relay(ctx, node) diff --git a/pkg/services/object/put/streamer.go b/pkg/services/object/put/streamer.go index bf6c20588..2dccafa47 100644 --- a/pkg/services/object/put/streamer.go +++ b/pkg/services/object/put/streamer.go @@ -22,7 +22,7 @@ type Streamer struct { sessionKey *ecdsa.PrivateKey - target transformer.ObjectTarget + target transformer.ChunkedObjectWriter relay func(context.Context, client.NodeInfo, client.MultiAddressClient) error @@ -129,7 +129,7 @@ func (p *Streamer) initTrustedTarget(prm *PutInitPrm) error { fmt: p.fmtValidator, nextTarget: transformer.NewPayloadSizeLimiter(transformer.Params{ Key: sessionKey, - NextTargetInit: func() transformer.ObjectTarget { return p.newCommonTarget(prm) }, + NextTargetInit: func() transformer.ObjectWriter { return p.newCommonTarget(prm) }, NetworkState: p.networkState, MaxSize: p.maxPayloadSz, WithoutHomomorphicHash: containerSDK.IsHomomorphicHashingDisabled(prm.cnr), @@ -192,7 +192,7 @@ func (p *Streamer) preparePrm(prm *PutInitPrm) error { return nil } -func (p *Streamer) newCommonTarget(prm *PutInitPrm) transformer.ObjectTarget { +func (p *Streamer) newCommonTarget(prm *PutInitPrm) *distributedTarget { var relay func(context.Context, nodeDesc) error if p.relay != nil { relay = func(ctx context.Context, node nodeDesc) error { diff --git a/pkg/services/object/put/validation.go b/pkg/services/object/put/validation.go index d4b08a038..c2b078ef5 100644 --- a/pkg/services/object/put/validation.go +++ b/pkg/services/object/put/validation.go @@ -17,14 +17,14 @@ import ( // validatingTarget validates unprepared object format and content (streaming PUT case). type validatingTarget struct { - nextTarget transformer.ObjectTarget + nextTarget transformer.ChunkedObjectWriter fmt *object.FormatValidator } // validatingPreparedTarget validates prepared object format and content. type validatingPreparedTarget struct { - nextTarget transformer.ObjectTarget + nextTarget transformer.ChunkedObjectWriter fmt *object.FormatValidator