From 7da284f3e84d596fca56e93947c98bc74e579706 Mon Sep 17 00:00:00 2001 From: Evgenii Stratonikov Date: Tue, 11 Jul 2023 17:32:00 +0300 Subject: [PATCH] [#509] go.mod: Update sdk-go Signed-off-by: Evgenii Stratonikov --- go.mod | 2 +- go.sum | Bin 99007 -> 99007 bytes pkg/services/object/put/distributed.go | 31 +++++++++++++++---------- pkg/services/object/put/streamer.go | 6 ++--- pkg/services/object/put/validation.go | 4 ++-- 5 files changed, 25 insertions(+), 18 deletions(-) diff --git a/go.mod b/go.mod index 9a53bb8fa..e2735b223 100644 --- a/go.mod +++ b/go.mod @@ -6,7 +6,7 @@ require ( git.frostfs.info/TrueCloudLab/frostfs-api-go/v2 v2.15.1-0.20230704092742-285516a94ebe git.frostfs.info/TrueCloudLab/frostfs-contract v0.0.0-20230627134746-36f3d39c406a git.frostfs.info/TrueCloudLab/frostfs-observability v0.0.0-20230531082742-c97d21411eb6 - git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20230706140617-98cab7ed6166 + git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20230711142135-998fe1a7ab31 git.frostfs.info/TrueCloudLab/hrw v1.2.1 git.frostfs.info/TrueCloudLab/tzhash v1.8.0 github.com/cheggaaa/pb v1.0.29 diff --git a/go.sum b/go.sum index 31a4cb9e1691022a8766fb91c20b9c3e33d5760a..ddbc776336c5b9c02464f6cba41387ea84b1f6d2 100644 GIT binary patch delta 179 zcmdnr%C^6iZG(lNi=m;RiIJhPsjj7^MOvz1qIqJHv7thSp;bVzM_5Uvi*Z@GkzYts zO1M*Kl|gu6j$ycmiBVp#bAYxT3RG0CYh(Em>HUxDP$O0c?6fc=sUY46@?a*8g?S!+h32_cVab^(c{z?iGZiM+2yr#*3T@XF IVw_X}0Jo?%hX4Qo diff --git a/pkg/services/object/put/distributed.go b/pkg/services/object/put/distributed.go index c8294c0b8..408ee099c 100644 --- a/pkg/services/object/put/distributed.go +++ b/pkg/services/object/put/distributed.go @@ -135,18 +135,7 @@ func (t *distributedTarget) Close(ctx context.Context) (*transformer.AccessIdent t.obj.SetPayload(t.payload.Data) - var err error - - if t.objMeta, err = t.fmt.ValidateContent(t.obj); err != nil { - return nil, fmt.Errorf("(%T) could not validate payload content: %w", t, err) - } - - if len(t.obj.Children()) > 0 { - // enabling extra broadcast for linking objects - t.traversal.extraBroadcastEnabled = true - } - - if err := t.iteratePlacement(ctx); err != nil { + if err := t.WriteObject(ctx, t.obj); err != nil { return nil, err } @@ -156,6 +145,24 @@ func (t *distributedTarget) Close(ctx context.Context) (*transformer.AccessIdent }, nil } +// WriteObject implements the transformer.ObjectWriter interface. +func (t *distributedTarget) WriteObject(ctx context.Context, obj *objectSDK.Object) error { + t.obj = obj + + var err error + + if t.objMeta, err = t.fmt.ValidateContent(t.obj); err != nil { + return fmt.Errorf("(%T) could not validate payload content: %w", t, err) + } + + if len(t.obj.Children()) > 0 { + // enabling extra broadcast for linking objects + t.traversal.extraBroadcastEnabled = true + } + + return t.iteratePlacement(ctx) +} + func (t *distributedTarget) sendObject(ctx context.Context, node nodeDesc) error { if !node.local && t.relay != nil { return t.relay(ctx, node) diff --git a/pkg/services/object/put/streamer.go b/pkg/services/object/put/streamer.go index bf6c20588..2dccafa47 100644 --- a/pkg/services/object/put/streamer.go +++ b/pkg/services/object/put/streamer.go @@ -22,7 +22,7 @@ type Streamer struct { sessionKey *ecdsa.PrivateKey - target transformer.ObjectTarget + target transformer.ChunkedObjectWriter relay func(context.Context, client.NodeInfo, client.MultiAddressClient) error @@ -129,7 +129,7 @@ func (p *Streamer) initTrustedTarget(prm *PutInitPrm) error { fmt: p.fmtValidator, nextTarget: transformer.NewPayloadSizeLimiter(transformer.Params{ Key: sessionKey, - NextTargetInit: func() transformer.ObjectTarget { return p.newCommonTarget(prm) }, + NextTargetInit: func() transformer.ObjectWriter { return p.newCommonTarget(prm) }, NetworkState: p.networkState, MaxSize: p.maxPayloadSz, WithoutHomomorphicHash: containerSDK.IsHomomorphicHashingDisabled(prm.cnr), @@ -192,7 +192,7 @@ func (p *Streamer) preparePrm(prm *PutInitPrm) error { return nil } -func (p *Streamer) newCommonTarget(prm *PutInitPrm) transformer.ObjectTarget { +func (p *Streamer) newCommonTarget(prm *PutInitPrm) *distributedTarget { var relay func(context.Context, nodeDesc) error if p.relay != nil { relay = func(ctx context.Context, node nodeDesc) error { diff --git a/pkg/services/object/put/validation.go b/pkg/services/object/put/validation.go index d4b08a038..c2b078ef5 100644 --- a/pkg/services/object/put/validation.go +++ b/pkg/services/object/put/validation.go @@ -17,14 +17,14 @@ import ( // validatingTarget validates unprepared object format and content (streaming PUT case). type validatingTarget struct { - nextTarget transformer.ObjectTarget + nextTarget transformer.ChunkedObjectWriter fmt *object.FormatValidator } // validatingPreparedTarget validates prepared object format and content. type validatingPreparedTarget struct { - nextTarget transformer.ObjectTarget + nextTarget transformer.ChunkedObjectWriter fmt *object.FormatValidator