diff --git a/pkg/services/object/put/builder.go b/pkg/services/object/put/builder.go new file mode 100644 index 00000000..64baf4e0 --- /dev/null +++ b/pkg/services/object/put/builder.go @@ -0,0 +1,54 @@ +package putsvc + +import ( + "context" + + objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" + "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/transformer" +) + +var _ transformer.ChunkedObjectWriter = (*inMemoryObjectBuilder)(nil) + +type inMemoryObjectBuilder struct { + objectWriter transformer.ObjectWriter + payload *payload + + obj *objectSDK.Object +} + +func newInMemoryObjectBuilder(objectWriter transformer.ObjectWriter) *inMemoryObjectBuilder { + return &inMemoryObjectBuilder{ + objectWriter: objectWriter, + payload: getPayload(), + } +} + +func (b *inMemoryObjectBuilder) Close(ctx context.Context) (*transformer.AccessIdentifiers, error) { + defer func() { + putPayload(b.payload) + b.payload = nil + }() + + b.obj.SetPayload(b.payload.Data) + + if err := b.objectWriter.WriteObject(ctx, b.obj); err != nil { + return nil, err + } + + id, _ := b.obj.ID() + return &transformer.AccessIdentifiers{ + SelfID: id, + }, nil +} + +func (b *inMemoryObjectBuilder) Write(_ context.Context, p []byte) (int, error) { + b.payload.Data = append(b.payload.Data, p...) + + return len(p), nil +} + +func (b *inMemoryObjectBuilder) WriteHeader(_ context.Context, obj *objectSDK.Object) error { + b.obj = obj + + return nil +} diff --git a/pkg/services/object/put/distributed.go b/pkg/services/object/put/distributed.go index 509f4aee..c71427b6 100644 --- a/pkg/services/object/put/distributed.go +++ b/pkg/services/object/put/distributed.go @@ -7,7 +7,6 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" - "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/transformer" ) type preparedObjectTarget interface { @@ -15,16 +14,13 @@ type preparedObjectTarget interface { } type distributedTarget struct { - placementOpts []placement.Option - extraBroadcastEnabled bool + placementOpts []placement.Option obj *objectSDK.Object objMeta object.ContentMeta *cfg - payload *payload - nodeTargetInitializer func(nodeDesc) preparedObjectTarget relay func(context.Context, nodeDesc) error @@ -91,36 +87,6 @@ func (x errIncompletePut) Error() string { return commonMsg } -func (t *distributedTarget) WriteHeader(_ context.Context, obj *objectSDK.Object) error { - t.obj = obj - - return nil -} - -func (t *distributedTarget) Write(_ context.Context, p []byte) (n int, err error) { - t.payload.Data = append(t.payload.Data, p...) - - return len(p), nil -} - -func (t *distributedTarget) Close(ctx context.Context) (*transformer.AccessIdentifiers, error) { - defer func() { - putPayload(t.payload) - t.payload = nil - }() - - t.obj.SetPayload(t.payload.Data) - - if err := t.WriteObject(ctx, t.obj); err != nil { - return nil, err - } - - id, _ := t.obj.ID() - return &transformer.AccessIdentifiers{ - SelfID: id, - }, nil -} - // WriteObject implements the transformer.ObjectWriter interface. func (t *distributedTarget) WriteObject(ctx context.Context, obj *objectSDK.Object) error { t.obj = obj diff --git a/pkg/services/object/put/streamer.go b/pkg/services/object/put/streamer.go index 90d580a1..f32b2ab9 100644 --- a/pkg/services/object/put/streamer.go +++ b/pkg/services/object/put/streamer.go @@ -12,7 +12,6 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement" pkgutil "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util" containerSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container" - objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/transformer" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user" ) @@ -79,7 +78,7 @@ func (p *Streamer) initUntrustedTarget(prm *PutInitPrm) error { // prepare untrusted-Put object target p.target = &validatingPreparedTarget{ - nextTarget: p.newCommonTarget(prm), + nextTarget: newInMemoryObjectBuilder(p.newObjectWriter(prm)), fmt: p.fmtValidator, maxPayloadSz: p.maxPayloadSz, @@ -133,7 +132,7 @@ func (p *Streamer) initTrustedTarget(prm *PutInitPrm) error { fmt: p.fmtValidator, nextTarget: transformer.NewPayloadSizeLimiter(transformer.Params{ Key: sessionKey, - NextTargetInit: func() transformer.ObjectWriter { return p.newCommonTarget(prm) }, + NextTargetInit: func() transformer.ObjectWriter { return p.newObjectWriter(prm) }, NetworkState: p.networkState, MaxSize: p.maxPayloadSz, WithoutHomomorphicHash: containerSDK.IsHomomorphicHashingDisabled(prm.cnr), @@ -196,7 +195,7 @@ func (p *Streamer) preparePrm(prm *PutInitPrm) error { return nil } -func (p *Streamer) newCommonTarget(prm *PutInitPrm) *distributedTarget { +func (p *Streamer) newObjectWriter(prm *PutInitPrm) transformer.ObjectWriter { var relay func(context.Context, nodeDesc) error if p.relay != nil { relay = func(ctx context.Context, node nodeDesc) error { @@ -213,16 +212,9 @@ func (p *Streamer) newCommonTarget(prm *PutInitPrm) *distributedTarget { } } - // enable additional container broadcast on non-local operation - // if object has TOMBSTONE or LOCK type. - typ := prm.hdr.Type() - withBroadcast := !prm.common.LocalOnly() && (typ == objectSDK.TypeTombstone || typ == objectSDK.TypeLock) - return &distributedTarget{ - cfg: p.cfg, - placementOpts: prm.traverseOpts, - extraBroadcastEnabled: withBroadcast, - payload: getPayload(), + cfg: p.cfg, + placementOpts: prm.traverseOpts, nodeTargetInitializer: func(node nodeDesc) preparedObjectTarget { if node.local { return localTarget{