Update SDK #509

Merged
fyrchik merged 1 commit from fyrchik/frostfs-node:update-sdk into master 2023-07-26 21:07:59 +00:00
5 changed files with 25 additions and 18 deletions

2
go.mod
View file

@ -6,7 +6,7 @@ require (
git.frostfs.info/TrueCloudLab/frostfs-api-go/v2 v2.15.1-0.20230704092742-285516a94ebe git.frostfs.info/TrueCloudLab/frostfs-api-go/v2 v2.15.1-0.20230704092742-285516a94ebe
git.frostfs.info/TrueCloudLab/frostfs-contract v0.0.0-20230627134746-36f3d39c406a git.frostfs.info/TrueCloudLab/frostfs-contract v0.0.0-20230627134746-36f3d39c406a
git.frostfs.info/TrueCloudLab/frostfs-observability v0.0.0-20230531082742-c97d21411eb6 git.frostfs.info/TrueCloudLab/frostfs-observability v0.0.0-20230531082742-c97d21411eb6
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20230706140617-98cab7ed6166 git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20230711142135-998fe1a7ab31
git.frostfs.info/TrueCloudLab/hrw v1.2.1 git.frostfs.info/TrueCloudLab/hrw v1.2.1
git.frostfs.info/TrueCloudLab/tzhash v1.8.0 git.frostfs.info/TrueCloudLab/tzhash v1.8.0
github.com/cheggaaa/pb v1.0.29 github.com/cheggaaa/pb v1.0.29

BIN
go.sum

Binary file not shown.

View file

@ -135,18 +135,7 @@ func (t *distributedTarget) Close(ctx context.Context) (*transformer.AccessIdent
t.obj.SetPayload(t.payload.Data) t.obj.SetPayload(t.payload.Data)
var err error if err := t.WriteObject(ctx, t.obj); err != nil {
if t.objMeta, err = t.fmt.ValidateContent(t.obj); err != nil {
return nil, fmt.Errorf("(%T) could not validate payload content: %w", t, err)
}
if len(t.obj.Children()) > 0 {
// enabling extra broadcast for linking objects
t.traversal.extraBroadcastEnabled = true
}
if err := t.iteratePlacement(ctx); err != nil {
return nil, err return nil, err
} }
@ -156,6 +145,24 @@ func (t *distributedTarget) Close(ctx context.Context) (*transformer.AccessIdent
}, nil }, nil
} }
// WriteObject implements the transformer.ObjectWriter interface.
func (t *distributedTarget) WriteObject(ctx context.Context, obj *objectSDK.Object) error {
t.obj = obj
var err error
if t.objMeta, err = t.fmt.ValidateContent(t.obj); err != nil {
return fmt.Errorf("(%T) could not validate payload content: %w", t, err)
}
if len(t.obj.Children()) > 0 {
// enabling extra broadcast for linking objects
t.traversal.extraBroadcastEnabled = true
}
return t.iteratePlacement(ctx)
}
func (t *distributedTarget) sendObject(ctx context.Context, node nodeDesc) error { func (t *distributedTarget) sendObject(ctx context.Context, node nodeDesc) error {
if !node.local && t.relay != nil { if !node.local && t.relay != nil {
return t.relay(ctx, node) return t.relay(ctx, node)

View file

@ -22,7 +22,7 @@ type Streamer struct {
sessionKey *ecdsa.PrivateKey sessionKey *ecdsa.PrivateKey
target transformer.ObjectTarget target transformer.ChunkedObjectWriter
relay func(context.Context, client.NodeInfo, client.MultiAddressClient) error relay func(context.Context, client.NodeInfo, client.MultiAddressClient) error
@ -129,7 +129,7 @@ func (p *Streamer) initTrustedTarget(prm *PutInitPrm) error {
fmt: p.fmtValidator, fmt: p.fmtValidator,
nextTarget: transformer.NewPayloadSizeLimiter(transformer.Params{ nextTarget: transformer.NewPayloadSizeLimiter(transformer.Params{
Key: sessionKey, Key: sessionKey,
NextTargetInit: func() transformer.ObjectTarget { return p.newCommonTarget(prm) }, NextTargetInit: func() transformer.ObjectWriter { return p.newCommonTarget(prm) },
NetworkState: p.networkState, NetworkState: p.networkState,
MaxSize: p.maxPayloadSz, MaxSize: p.maxPayloadSz,
WithoutHomomorphicHash: containerSDK.IsHomomorphicHashingDisabled(prm.cnr), WithoutHomomorphicHash: containerSDK.IsHomomorphicHashingDisabled(prm.cnr),
@ -192,7 +192,7 @@ func (p *Streamer) preparePrm(prm *PutInitPrm) error {
return nil return nil
} }
func (p *Streamer) newCommonTarget(prm *PutInitPrm) transformer.ObjectTarget { func (p *Streamer) newCommonTarget(prm *PutInitPrm) *distributedTarget {
var relay func(context.Context, nodeDesc) error var relay func(context.Context, nodeDesc) error
if p.relay != nil { if p.relay != nil {
relay = func(ctx context.Context, node nodeDesc) error { relay = func(ctx context.Context, node nodeDesc) error {

View file

@ -17,14 +17,14 @@ import (
// validatingTarget validates unprepared object format and content (streaming PUT case). // validatingTarget validates unprepared object format and content (streaming PUT case).
type validatingTarget struct { type validatingTarget struct {
nextTarget transformer.ObjectTarget nextTarget transformer.ChunkedObjectWriter
fmt *object.FormatValidator fmt *object.FormatValidator
} }
// validatingPreparedTarget validates prepared object format and content. // validatingPreparedTarget validates prepared object format and content.
type validatingPreparedTarget struct { type validatingPreparedTarget struct {
nextTarget transformer.ObjectTarget nextTarget transformer.ChunkedObjectWriter
fmt *object.FormatValidator fmt *object.FormatValidator